Skip to content

OpenSearch Repository GCS Plugin Upgrade - Migration and Best Practices

Published: at 12:45 PM

OpenSearch Repository GCS Plugin Upgrade: Migration and Best Practices

This comprehensive guide covers the upgrade process for the OpenSearch repository-gcs plugin, which enables snapshot and restore functionality using Google Cloud Storage (GCS). We’ll explore migration strategies, compatibility considerations, and best practices for maintaining data integrity during the upgrade process.

Overview

The repository-gcs plugin allows OpenSearch to:

Understanding the Plugin Architecture

Plugin Components

  1. Core Repository Logic: Handles snapshot/restore operations
  2. GCS Client: Manages communication with Google Cloud Storage
  3. Authentication Module: Handles GCP credentials and service accounts
  4. Compression Engine: Optimizes storage usage
  5. Metadata Manager: Tracks snapshot state and indices

Version Compatibility Matrix

OpenSearch VersionPlugin VersionGCS Client VersionNotes
1.0.x - 1.2.x1.0.01.117.0Legacy support
1.3.x1.3.01.117.0Stable
2.0.x - 2.4.x2.0.02.3.0Breaking changes
2.5.x - 2.9.x2.5.02.3.0Current stable
2.10.x+2.10.02.8.0Latest features

Pre-Upgrade Assessment

1. Current State Analysis

#!/bin/bash
# check-gcs-plugin-status.sh

OPENSEARCH_URL="https://localhost:9200"
AUTH="-u admin:admin"

echo "=== OpenSearch GCS Plugin Status ==="

# Check installed plugins
echo "\nInstalled plugins:"
curl -s $AUTH "$OPENSEARCH_URL/_cat/plugins?v" | grep repository-gcs

# Check plugin version details
echo "\nPlugin details:"
curl -s $AUTH "$OPENSEARCH_URL/_nodes/plugins?filter_path=nodes.*.plugins" | \
  jq '.nodes[].plugins[] | select(.name == "repository-gcs")'

# List existing repositories
echo "\nGCS repositories:"
curl -s $AUTH "$OPENSEARCH_URL/_snapshot?pretty" | \
  jq '.[] | select(.type == "gcs") | {name: .type, settings: .settings}'

# Check active snapshots
echo "\nActive snapshots:"
for repo in $(curl -s $AUTH "$OPENSEARCH_URL/_snapshot" | jq -r 'keys[]'); do
  echo "Repository: $repo"
  curl -s $AUTH "$OPENSEARCH_URL/_snapshot/$repo/_current?pretty"
done

# Check snapshot statistics
echo "\nSnapshot statistics:"
curl -s $AUTH "$OPENSEARCH_URL/_snapshot/_stats?pretty"

2. Backup Current Configuration

#!/bin/bash
# backup-gcs-config.sh

BACKUP_DIR="/backup/opensearch-gcs/$(date +%Y%m%d-%H%M%S)"
mkdir -p "$BACKUP_DIR"

# Export repository settings
echo "Backing up repository configurations..."
curl -s $AUTH "$OPENSEARCH_URL/_snapshot" > "$BACKUP_DIR/repositories.json"

# Export snapshot list for each repository
for repo in $(curl -s $AUTH "$OPENSEARCH_URL/_snapshot" | jq -r 'keys[]'); do
  echo "Backing up snapshots for repository: $repo"
  curl -s $AUTH "$OPENSEARCH_URL/_snapshot/$repo/_all" > "$BACKUP_DIR/snapshots-$repo.json"
done

# Backup plugin configuration
echo "Backing up plugin configuration..."
cp /etc/opensearch/opensearch.yml "$BACKUP_DIR/"
cp -r /etc/opensearch/repository-gcs/ "$BACKUP_DIR/" 2>/dev/null || true

# Document current version
echo "Documenting current versions..."
cat > "$BACKUP_DIR/version-info.txt" <<EOF
OpenSearch Version: $(curl -s $AUTH "$OPENSEARCH_URL" | jq -r '.version.number')
Plugin Version: $(curl -s $AUTH "$OPENSEARCH_URL/_nodes/plugins" | jq -r '.nodes[].plugins[] | select(.name == "repository-gcs") | .version')
Backup Date: $(date)
EOF

echo "Backup completed: $BACKUP_DIR"

Upgrade Process

Step 1: Prepare for Upgrade

Disable Snapshot Operations

# Disable SLM policies
curl -X POST "$OPENSEARCH_URL/_slm/stop" $AUTH

# Wait for active snapshots to complete
while true; do
  active=$(curl -s $AUTH "$OPENSEARCH_URL/_snapshot/_status" | jq '.snapshots | length')
  if [ "$active" -eq 0 ]; then
    echo "No active snapshots. Safe to proceed."
    break
  fi
  echo "Waiting for $active active snapshots to complete..."
  sleep 30
done

Create Final Backup

# Create a final snapshot before upgrade
FINAL_SNAPSHOT="pre-upgrade-$(date +%Y%m%d-%H%M%S)"

curl -X PUT "$OPENSEARCH_URL/_snapshot/gcs-backup/$FINAL_SNAPSHOT?wait_for_completion=true" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "indices": "*",
    "include_global_state": true,
    "metadata": {
      "reason": "Pre-upgrade backup",
      "upgrade_from": "'$(curl -s $AUTH "$OPENSEARCH_URL/_nodes/plugins" | jq -r '.nodes[].plugins[] | select(.name == "repository-gcs") | .version')'"
    }
  }'

Step 2: Remove Old Plugin

#!/bin/bash
# remove-old-plugin.sh

# Stop OpenSearch
sudo systemctl stop opensearch

# Remove the old plugin
sudo -u opensearch /usr/share/opensearch/bin/opensearch-plugin remove repository-gcs

# Clean up any residual files
sudo rm -rf /usr/share/opensearch/plugins/repository-gcs/
sudo rm -rf /var/lib/opensearch/repository-gcs/

# Clear plugin cache
sudo rm -rf /tmp/opensearch-*

Step 3: Install New Plugin Version

#!/bin/bash
# install-new-plugin.sh

# Set the target version
PLUGIN_VERSION="2.10.0"
OPENSEARCH_VERSION="2.10.0"

# Install the new plugin
sudo -u opensearch /usr/share/opensearch/bin/opensearch-plugin install \
  "repository-gcs:${PLUGIN_VERSION}"

# Verify installation
/usr/share/opensearch/bin/opensearch-plugin list | grep repository-gcs

Step 4: Configure New Plugin

Update OpenSearch Configuration

# /etc/opensearch/opensearch.yml

# GCS Repository Plugin Settings
gcs:
  client:
    default:
      # Authentication method (service account recommended)
      credentials:
        file: "/etc/opensearch/gcs-credentials.json"

      # Connection settings
      connect_timeout: "30s"
      read_timeout: "60s"

      # Retry settings
      max_retries: 3
      retry_interval: "1s"

      # Performance settings
      chunk_size: "100mb"
      compress: true

    # Additional client for different project
    backup:
      project_id: "backup-project-123"
      credentials:
        file: "/etc/opensearch/gcs-backup-credentials.json"
      endpoint: "https://storage.googleapis.com"

# Repository settings
repositories:
  gcs:
    # Concurrent operations
    max_restore_bytes_per_sec: "100mb"
    max_snapshot_bytes_per_sec: "40mb"

    # Chunk settings
    chunk_size: "1gb"
    compress: true

    # Cache settings
    cache:
      enabled: true
      size: "10gb"
      expire_after_write: "30m"

Service Account Configuration

// /etc/opensearch/gcs-credentials.json
{
  "type": "service_account",
  "project_id": "your-project-id",
  "private_key_id": "key-id",
  "private_key": "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n",
  "client_email": "opensearch-backup@your-project-id.iam.gserviceaccount.com",
  "client_id": "1234567890",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/opensearch-backup%40your-project-id.iam.gserviceaccount.com"
}

Set proper permissions:

sudo chown opensearch:opensearch /etc/opensearch/gcs-credentials.json
sudo chmod 600 /etc/opensearch/gcs-credentials.json

Step 5: Start OpenSearch and Verify

# Start OpenSearch
sudo systemctl start opensearch

# Wait for cluster to be ready
while ! curl -s $AUTH "$OPENSEARCH_URL/_cluster/health" | grep -q '"status":"green"\|"status":"yellow"'; do
  echo "Waiting for cluster to be ready..."
  sleep 5
done

# Verify plugin is loaded
curl -s $AUTH "$OPENSEARCH_URL/_cat/plugins?v" | grep repository-gcs

Step 6: Reconfigure Repositories

#!/bin/bash
# reconfigure-repositories.sh

# Update existing repository with new settings
curl -X PUT "$OPENSEARCH_URL/_snapshot/gcs-backup" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "type": "gcs",
    "settings": {
      "bucket": "opensearch-snapshots",
      "client": "default",
      "base_path": "snapshots/prod",
      "chunk_size": "1gb",
      "compress": true,
      "max_restore_bytes_per_sec": "100mb",
      "max_snapshot_bytes_per_sec": "40mb",
      "readonly": false,
      "metadata": {
        "cluster_name": "production",
        "upgraded_at": "'$(date -u +%Y-%m-%dT%H:%M:%SZ)'"
      }
    }
  }'

# Verify repository
curl -X POST "$OPENSEARCH_URL/_snapshot/gcs-backup/_verify" $AUTH

Migration Strategies

Strategy 1: In-Place Upgrade

Suitable for minor version upgrades with backward compatibility.

#!/bin/bash
# in-place-upgrade.sh

# 1. Create verification snapshot
VERIFY_SNAPSHOT="verify-$(date +%Y%m%d-%H%M%S)"
curl -X PUT "$OPENSEARCH_URL/_snapshot/gcs-backup/$VERIFY_SNAPSHOT" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "indices": ".opensearch",
    "include_global_state": false
  }'

# 2. Test restore capability
curl -X POST "$OPENSEARCH_URL/_snapshot/gcs-backup/$VERIFY_SNAPSHOT/_restore" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "indices": ".opensearch",
    "rename_pattern": "(.+)",
    "rename_replacement": "test_$1"
  }'

# 3. Verify and cleanup
curl -X DELETE "$OPENSEARCH_URL/test_*" $AUTH
curl -X DELETE "$OPENSEARCH_URL/_snapshot/gcs-backup/$VERIFY_SNAPSHOT" $AUTH

Strategy 2: Blue-Green Migration

For major version upgrades or when downtime must be minimized.

#!/bin/bash
# blue-green-migration.sh

# Setup new cluster with new plugin version
NEW_CLUSTER="https://new-cluster:9200"

# 1. Configure repository on new cluster
curl -X PUT "$NEW_CLUSTER/_snapshot/gcs-migration" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "type": "gcs",
    "settings": {
      "bucket": "opensearch-snapshots",
      "client": "default",
      "base_path": "migration/temp",
      "readonly": false
    }
  }'

# 2. Create snapshot on old cluster
MIGRATION_SNAPSHOT="migration-$(date +%Y%m%d-%H%M%S)"
curl -X PUT "$OPENSEARCH_URL/_snapshot/gcs-backup/$MIGRATION_SNAPSHOT?wait_for_completion=false" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "indices": "*",
    "include_global_state": true
  }'

# 3. Monitor snapshot progress
while true; do
  STATUS=$(curl -s $AUTH "$OPENSEARCH_URL/_snapshot/gcs-backup/$MIGRATION_SNAPSHOT/_status" | \
    jq -r '.snapshots[0].state')
  if [ "$STATUS" = "SUCCESS" ]; then
    break
  fi
  echo "Snapshot status: $STATUS"
  sleep 30
done

# 4. Restore on new cluster
curl -X POST "$NEW_CLUSTER/_snapshot/gcs-migration/$MIGRATION_SNAPSHOT/_restore" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "indices": "*",
    "include_global_state": false,
    "index_settings": {
      "index.number_of_replicas": 0
    }
  }'

Strategy 3: Incremental Migration

For large datasets where full snapshot/restore is impractical.

#!/usr/bin/env python3
# incremental-migration.py

import requests
import json
import time
from datetime import datetime, timedelta

class IncrementalMigration:
    def __init__(self, source_url, target_url, auth):
        self.source = source_url
        self.target = target_url
        self.auth = auth
        self.bucket = "opensearch-snapshots"

    def setup_repositories(self):
        """Setup GCS repositories on both clusters"""
        repo_config = {
            "type": "gcs",
            "settings": {
                "bucket": self.bucket,
                "client": "default",
                "base_path": "incremental",
                "chunk_size": "1gb",
                "compress": True
            }
        }

        # Setup on source
        requests.put(
            f"{self.source}/_snapshot/gcs-incremental",
            auth=self.auth,
            json=repo_config
        )

        # Setup on target
        requests.put(
            f"{self.target}/_snapshot/gcs-incremental",
            auth=self.auth,
            json=repo_config
        )

    def get_indices_by_age(self, days_old):
        """Get indices older than specified days"""
        response = requests.get(
            f"{self.source}/_cat/indices?format=json",
            auth=self.auth
        )

        indices = []
        cutoff_date = datetime.now() - timedelta(days=days_old)

        for index in response.json():
            # Parse index date from name (assuming pattern like logs-2024.01.15)
            try:
                date_str = index['index'].split('-')[-1]
                index_date = datetime.strptime(date_str, '%Y.%m.%d')

                if index_date < cutoff_date:
                    indices.append(index['index'])
            except:
                continue

        return indices

    def migrate_indices_batch(self, indices, batch_name):
        """Migrate a batch of indices"""
        snapshot_name = f"batch-{batch_name}-{int(time.time())}"

        # Create snapshot
        print(f"Creating snapshot {snapshot_name} for {len(indices)} indices...")
        response = requests.put(
            f"{self.source}/_snapshot/gcs-incremental/{snapshot_name}",
            auth=self.auth,
            json={
                "indices": ",".join(indices),
                "include_global_state": False,
                "metadata": {
                    "batch": batch_name,
                    "index_count": len(indices)
                }
            }
        )

        # Wait for completion
        self.wait_for_snapshot(snapshot_name)

        # Restore on target
        print(f"Restoring snapshot {snapshot_name}...")
        response = requests.post(
            f"{self.target}/_snapshot/gcs-incremental/{snapshot_name}/_restore",
            auth=self.auth,
            json={
                "indices": ",".join(indices),
                "include_global_state": False,
                "index_settings": {
                    "index.number_of_replicas": 0
                }
            }
        )

        return snapshot_name

    def wait_for_snapshot(self, snapshot_name):
        """Wait for snapshot to complete"""
        while True:
            response = requests.get(
                f"{self.source}/_snapshot/gcs-incremental/{snapshot_name}",
                auth=self.auth
            )

            snapshot = response.json()['snapshots'][0]
            state = snapshot['state']

            if state == 'SUCCESS':
                print(f"Snapshot {snapshot_name} completed successfully")
                break
            elif state == 'FAILED':
                raise Exception(f"Snapshot {snapshot_name} failed")
            else:
                print(f"Snapshot {snapshot_name} state: {state}")
                time.sleep(30)

    def run_incremental_migration(self):
        """Run the incremental migration process"""
        self.setup_repositories()

        # Migrate in batches by age
        age_ranges = [
            (365, "very-old"),    # > 1 year
            (180, "old"),         # 6-12 months
            (90, "medium"),       # 3-6 months
            (30, "recent"),       # 1-3 months
            (7, "current"),       # 1 week - 1 month
            (0, "latest")         # < 1 week
        ]

        for days, batch_name in age_ranges:
            indices = self.get_indices_by_age(days)
            if indices:
                print(f"\nMigrating {batch_name} indices ({len(indices)} total)...")
                self.migrate_indices_batch(indices[:50], batch_name)  # Batch of 50

                # Verify migration
                self.verify_indices(indices[:50])

    def verify_indices(self, indices):
        """Verify indices were migrated successfully"""
        for index in indices:
            source_count = requests.get(
                f"{self.source}/{index}/_count",
                auth=self.auth
            ).json()['count']

            target_count = requests.get(
                f"{self.target}/{index}/_count",
                auth=self.auth
            ).json()['count']

            if source_count != target_count:
                print(f"WARNING: Count mismatch for {index}: {source_count} vs {target_count}")
            else:
                print(f"Verified {index}: {source_count} documents")

# Run migration
if __name__ == "__main__":
    migration = IncrementalMigration(
        source_url="https://old-cluster:9200",
        target_url="https://new-cluster:9200",
        auth=('admin', 'admin')
    )
    migration.run_incremental_migration()

Performance Optimization

1. GCS Client Tuning

# Optimized GCS client configuration
gcs:
  client:
    default:
      # Connection pool settings
      connection_pool_size: 50
      connection_timeout: "30s"
      socket_timeout: "60s"

      # Retry configuration
      max_retries: 5
      retry_interval: "1s"
      retry_multiplier: 2
      max_retry_interval: "30s"

      # Performance settings
      chunk_size: "256mb" # Larger chunks for better throughput
      request_compression: true
      response_compression: true

      # HTTP settings
      http:
        max_connections: 50
        max_connections_per_route: 10
        connection_request_timeout: "10s"
        keep_alive_strategy: "default"

2. Snapshot Performance

#!/bin/bash
# optimize-snapshot-performance.sh

# Configure snapshot settings
curl -X PUT "$OPENSEARCH_URL/_cluster/settings" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "persistent": {
      "repositories.gcs.chunk_size": "1gb",
      "repositories.gcs.compress": true,
      "repositories.gcs.application_name": "opensearch-prod",
      "snapshot.max_restore_bytes_per_sec": "200mb",
      "snapshot.max_snapshot_bytes_per_sec": "100mb"
    },
    "transient": {
      "indices.recovery.max_bytes_per_sec": "200mb",
      "cluster.routing.allocation.node_concurrent_recoveries": 4
    }
  }'

# Create optimized repository
curl -X PUT "$OPENSEARCH_URL/_snapshot/gcs-optimized" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "type": "gcs",
    "settings": {
      "bucket": "opensearch-snapshots",
      "client": "default",
      "base_path": "optimized",
      "chunk_size": "1gb",
      "compress": true,
      "max_restore_bytes_per_sec": "200mb",
      "max_snapshot_bytes_per_sec": "100mb",
      "application_name": "opensearch-optimized"
    }
  }'

3. Parallel Operations

#!/usr/bin/env python3
# parallel-snapshot-operations.py

import concurrent.futures
import requests
import json
from datetime import datetime

class ParallelSnapshotManager:
    def __init__(self, opensearch_url, auth):
        self.url = opensearch_url
        self.auth = auth
        self.max_workers = 5

    def create_snapshot_parallel(self, indices_groups, repository):
        """Create snapshots in parallel for different index groups"""
        timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')

        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []

            for i, indices in enumerate(indices_groups):
                snapshot_name = f"parallel-{timestamp}-group{i}"
                future = executor.submit(
                    self._create_snapshot,
                    repository,
                    snapshot_name,
                    indices
                )
                futures.append((snapshot_name, future))

            # Wait for all snapshots to complete
            results = []
            for snapshot_name, future in futures:
                try:
                    result = future.result(timeout=3600)  # 1 hour timeout
                    results.append({
                        'snapshot': snapshot_name,
                        'status': 'success',
                        'details': result
                    })
                except Exception as e:
                    results.append({
                        'snapshot': snapshot_name,
                        'status': 'failed',
                        'error': str(e)
                    })

            return results

    def _create_snapshot(self, repository, snapshot_name, indices):
        """Create a single snapshot"""
        response = requests.put(
            f"{self.url}/_snapshot/{repository}/{snapshot_name}",
            auth=self.auth,
            json={
                "indices": ",".join(indices),
                "include_global_state": False,
                "metadata": {
                    "created_by": "parallel_snapshot_manager",
                    "index_count": len(indices)
                }
            }
        )

        if response.status_code != 200:
            raise Exception(f"Failed to create snapshot: {response.text}")

        # Wait for completion
        return self._wait_for_snapshot(repository, snapshot_name)

    def _wait_for_snapshot(self, repository, snapshot_name):
        """Wait for snapshot completion"""
        while True:
            response = requests.get(
                f"{self.url}/_snapshot/{repository}/{snapshot_name}",
                auth=self.auth
            )

            if response.status_code != 200:
                raise Exception(f"Failed to get snapshot status: {response.text}")

            snapshot = response.json()['snapshots'][0]
            if snapshot['state'] == 'SUCCESS':
                return snapshot
            elif snapshot['state'] == 'FAILED':
                raise Exception(f"Snapshot failed: {snapshot.get('failures', 'Unknown error')}")

            time.sleep(10)

Monitoring and Validation

1. Health Monitoring Script

#!/bin/bash
# monitor-gcs-plugin.sh

while true; do
    clear
    echo "=== OpenSearch GCS Plugin Monitor ==="
    echo "Time: $(date)"
    echo ""

    # Plugin status
    echo "Plugin Status:"
    curl -s $AUTH "$OPENSEARCH_URL/_nodes/stats/repositories" | \
      jq '.nodes[].repositories'
    echo ""

    # Active operations
    echo "Active Snapshot Operations:"
    curl -s $AUTH "$OPENSEARCH_URL/_snapshot/_status" | \
      jq '.snapshots[] | {snapshot: .snapshot, state: .state, progress: .shards_stats.done}'
    echo ""

    # Repository stats
    echo "Repository Statistics:"
    curl -s $AUTH "$OPENSEARCH_URL/_snapshot/_stats" | \
      jq '.stats'
    echo ""

    # GCS metrics
    echo "GCS Client Metrics:"
    curl -s $AUTH "$OPENSEARCH_URL/_nodes/stats/repositories?include_repository_stats=true" | \
      jq '.nodes[].repositories.gcs'

    sleep 30
done

2. Validation Suite

#!/usr/bin/env python3
# validate-gcs-upgrade.py

import requests
import json
import hashlib
from datetime import datetime

class GCSPluginValidator:
    def __init__(self, opensearch_url, auth):
        self.url = opensearch_url
        self.auth = auth
        self.validation_results = []

    def run_all_validations(self):
        """Run complete validation suite"""
        print("Running GCS Plugin Validation Suite...")
        print("=" * 50)

        self.validate_plugin_installation()
        self.validate_repository_access()
        self.validate_snapshot_operations()
        self.validate_restore_operations()
        self.validate_performance_metrics()
        self.validate_security_settings()

        self.print_results()

    def validate_plugin_installation(self):
        """Validate plugin is properly installed"""
        try:
            response = requests.get(
                f"{self.url}/_cat/plugins?format=json",
                auth=self.auth
            )

            plugins = response.json()
            gcs_plugin = next((p for p in plugins if p['component'] == 'repository-gcs'), None)

            if gcs_plugin:
                self.validation_results.append({
                    'test': 'Plugin Installation',
                    'status': 'PASS',
                    'details': f"Version {gcs_plugin['version']} installed"
                })
            else:
                self.validation_results.append({
                    'test': 'Plugin Installation',
                    'status': 'FAIL',
                    'details': 'Plugin not found'
                })
        except Exception as e:
            self.validation_results.append({
                'test': 'Plugin Installation',
                'status': 'ERROR',
                'details': str(e)
            })

    def validate_repository_access(self):
        """Validate GCS repository access"""
        test_repo = "gcs-validation-test"

        try:
            # Create test repository
            response = requests.put(
                f"{self.url}/_snapshot/{test_repo}",
                auth=self.auth,
                json={
                    "type": "gcs",
                    "settings": {
                        "bucket": "opensearch-snapshots",
                        "base_path": "validation-test",
                        "readonly": False
                    }
                }
            )

            if response.status_code == 200:
                # Verify repository
                verify_response = requests.post(
                    f"{self.url}/_snapshot/{test_repo}/_verify",
                    auth=self.auth
                )

                if verify_response.status_code == 200:
                    self.validation_results.append({
                        'test': 'Repository Access',
                        'status': 'PASS',
                        'details': 'Successfully created and verified repository'
                    })
                else:
                    self.validation_results.append({
                        'test': 'Repository Access',
                        'status': 'FAIL',
                        'details': f"Verification failed: {verify_response.text}"
                    })

                # Cleanup
                requests.delete(f"{self.url}/_snapshot/{test_repo}", auth=self.auth)
            else:
                self.validation_results.append({
                    'test': 'Repository Access',
                    'status': 'FAIL',
                    'details': f"Failed to create repository: {response.text}"
                })

        except Exception as e:
            self.validation_results.append({
                'test': 'Repository Access',
                'status': 'ERROR',
                'details': str(e)
            })

    def validate_snapshot_operations(self):
        """Validate snapshot creation and management"""
        # Implementation continues...
        pass

    def print_results(self):
        """Print validation results"""
        print("\nValidation Results:")
        print("=" * 50)

        for result in self.validation_results:
            status_color = {
                'PASS': '\033[92m',
                'FAIL': '\033[91m',
                'ERROR': '\033[93m'
            }.get(result['status'], '\033[0m')

            print(f"{status_color}{result['status']}\033[0m - {result['test']}")
            print(f"  Details: {result['details']}")
            print()

# Run validation
if __name__ == "__main__":
    validator = GCSPluginValidator(
        opensearch_url="https://localhost:9200",
        auth=('admin', 'admin')
    )
    validator.run_all_validations()

Troubleshooting

Common Issues and Solutions

1. Authentication Failures

# Check service account permissions
gcloud projects get-iam-policy YOUR_PROJECT_ID \
  --flatten="bindings[].members" \
  --filter="bindings.members:serviceAccount:opensearch-backup@*"

# Required permissions
cat > gcs-permissions.yaml <<EOF
title: "OpenSearch GCS Access"
description: "Permissions for OpenSearch GCS plugin"
stage: "GA"
includedPermissions:
- storage.buckets.get
- storage.buckets.list
- storage.objects.create
- storage.objects.delete
- storage.objects.get
- storage.objects.list
- storage.multipartUploads.abort
- storage.multipartUploads.create
- storage.multipartUploads.list
- storage.multipartUploads.listParts
EOF

# Create custom role
gcloud iam roles create opensearchGcsAccess \
  --project=YOUR_PROJECT_ID \
  --file=gcs-permissions.yaml

# Grant role to service account
gcloud projects add-iam-policy-binding YOUR_PROJECT_ID \
  --member="serviceAccount:opensearch-backup@YOUR_PROJECT_ID.iam.gserviceaccount.com" \
  --role="projects/YOUR_PROJECT_ID/roles/opensearchGcsAccess"

2. Connection Timeouts

# Increase timeouts in opensearch.yml
gcs:
  client:
    default:
      connect_timeout: "60s" # Increase from default 30s
      read_timeout: "120s" # Increase from default 60s

      # Retry settings for transient failures
      max_retries: 10
      retry_interval: "5s"

      # Connection pool
      connection_pool_size: 100
      connection_pool_timeout: "30s"

3. Memory Issues

# Increase JVM heap for repository operations
echo "-Xms4g" >> /etc/opensearch/jvm.options
echo "-Xmx4g" >> /etc/opensearch/jvm.options
echo "-XX:+UseG1GC" >> /etc/opensearch/jvm.options
echo "-XX:MaxGCPauseMillis=200" >> /etc/opensearch/jvm.options

# Configure memory circuit breaker
curl -X PUT "$OPENSEARCH_URL/_cluster/settings" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "persistent": {
      "indices.breaker.total.limit": "85%",
      "indices.breaker.request.limit": "60%",
      "indices.breaker.fielddata.limit": "40%"
    }
  }'

4. Slow Snapshot Performance

# Diagnose slow snapshots
curl -X GET "$OPENSEARCH_URL/_snapshot/_status?pretty" $AUTH

# Check thread pool stats
curl -X GET "$OPENSEARCH_URL/_nodes/stats/thread_pool?pretty" $AUTH | \
  jq '.nodes[].thread_pool.snapshot'

# Optimize thread pools
curl -X PUT "$OPENSEARCH_URL/_cluster/settings" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "persistent": {
      "thread_pool.snapshot.size": 10,
      "thread_pool.snapshot.queue_size": 1000
    }
  }'

Best Practices

1. Security Configuration

# Secure GCS configuration
gcs:
  client:
    default:
      # Use service account instead of API keys
      credentials:
        file: "/etc/opensearch/gcs-sa.json"

      # Enable request signing
      signing_enabled: true

      # Use private service endpoint
      endpoint: "https://storage.googleapis.com"

      # Enable SSL/TLS verification
      protocol: "https"
      verify_ssl: true

2. Backup Strategy

// Comprehensive backup policy
{
  "schedule": "0 30 1 * * ?",
  "name": "<nightly-snap-{now/d}>",
  "repository": "gcs-backup",
  "config": {
    "indices": ["*"],
    "ignore_unavailable": true,
    "include_global_state": false,
    "partial": false,
    "metadata": {
      "policy": "nightly",
      "retention_days": 30
    }
  },
  "retention": {
    "expire_after": "30d",
    "min_count": 7,
    "max_count": 90
  }
}

3. Monitoring and Alerting

# Setup monitoring alerts
curl -X PUT "$OPENSEARCH_URL/_plugins/_alerting/monitors" \
  $AUTH \
  -H "Content-Type: application/json" \
  -d '{
    "type": "monitor",
    "name": "gcs-snapshot-failures",
    "enabled": true,
    "schedule": {
      "period": {"interval": 5, "unit": "MINUTES"}
    },
    "inputs": [{
      "search": {
        "indices": [".opensearch-notifications-*"],
        "query": {
          "bool": {
            "must": [
              {"match": {"event.category": "snapshot"}},
              {"match": {"event.outcome": "failure"}},
              {"range": {"@timestamp": {"gte": "now-5m"}}}
            ]
          }
        }
      }
    }],
    "triggers": [{
      "name": "snapshot-failed",
      "severity": "1",
      "condition": {
        "script": {
          "source": "ctx.results[0].hits.total.value > 0"
        }
      },
      "actions": [{
        "name": "notify-ops",
        "destination_id": "ops-channel",
        "message_template": {
          "source": "GCS Snapshot failed: {{ctx.results[0].hits.hits[0]._source.event.reason}}"
        }
      }]
    }]
  }'

4. Cost Optimization

#!/usr/bin/env python3
# optimize-gcs-costs.py

import requests
import json
from datetime import datetime, timedelta

class GCSCostOptimizer:
    def __init__(self, opensearch_url, auth):
        self.url = opensearch_url
        self.auth = auth

    def analyze_snapshot_costs(self):
        """Analyze snapshot storage costs"""
        # Get all snapshots
        response = requests.get(
            f"{self.url}/_snapshot/_all",
            auth=self.auth
        )

        total_size = 0
        old_snapshots = []

        for repo_name, repo_data in response.json().items():
            snapshots_response = requests.get(
                f"{self.url}/_snapshot/{repo_name}/_all",
                auth=self.auth
            )

            for snapshot in snapshots_response.json()['snapshots']:
                # Calculate age
                start_time = datetime.fromtimestamp(snapshot['start_time_in_millis'] / 1000)
                age = datetime.now() - start_time

                # Track old snapshots
                if age > timedelta(days=90):
                    old_snapshots.append({
                        'repository': repo_name,
                        'snapshot': snapshot['snapshot'],
                        'age_days': age.days,
                        'size_gb': snapshot.get('total_size', 0) / (1024**3)
                    })

                total_size += snapshot.get('total_size', 0)

        # Calculate costs (example: $0.02 per GB per month for standard storage)
        monthly_cost = (total_size / (1024**3)) * 0.02

        print(f"\nSnapshot Storage Analysis:")
        print(f"Total Size: {total_size / (1024**3):.2f} GB")
        print(f"Estimated Monthly Cost: ${monthly_cost:.2f}")
        print(f"\nOld Snapshots (>90 days): {len(old_snapshots)}")

        # Recommendations
        if old_snapshots:
            potential_savings = sum(s['size_gb'] for s in old_snapshots) * 0.02
            print(f"Potential Monthly Savings: ${potential_savings:.2f}")
            print("\nRecommended Deletions:")
            for snapshot in sorted(old_snapshots, key=lambda x: x['age_days'], reverse=True)[:10]:
                print(f"- {snapshot['repository']}/{snapshot['snapshot']} ({snapshot['age_days']} days, {snapshot['size_gb']:.2f} GB)")

    def implement_lifecycle_policy(self):
        """Implement cost-optimized lifecycle policy"""
        policy = {
            "schedule": "0 0 2 * * ?",  # Daily at 2 AM
            "name": "<cost-optimized-{now/d}>",
            "repository": "gcs-backup",
            "config": {
                "indices": ["*"],
                "ignore_unavailable": True,
                "include_global_state": False,
                "partial": False
            },
            "retention": {
                "expire_after": "30d",  # Keep for 30 days
                "min_count": 7,         # Always keep at least 7
                "max_count": 30         # Never keep more than 30
            }
        }

        # Apply policy
        response = requests.put(
            f"{self.url}/_slm/policy/cost-optimized",
            auth=self.auth,
            json=policy
        )

        if response.status_code == 200:
            print("Cost-optimized lifecycle policy implemented successfully")
        else:
            print(f"Failed to implement policy: {response.text}")

# Run cost analysis
if __name__ == "__main__":
    optimizer = GCSCostOptimizer(
        opensearch_url="https://localhost:9200",
        auth=('admin', 'admin')
    )
    optimizer.analyze_snapshot_costs()
    optimizer.implement_lifecycle_policy()

Conclusion

Upgrading the OpenSearch repository-gcs plugin requires careful planning and execution. Key considerations include:

  1. Compatibility: Ensure plugin version matches OpenSearch version
  2. Data Safety: Always backup before upgrading
  3. Testing: Validate functionality in non-production environments
  4. Performance: Optimize settings for your workload
  5. Security: Use service accounts and proper IAM roles
  6. Monitoring: Implement comprehensive monitoring and alerting
  7. Cost Management: Regular cleanup and lifecycle policies

By following this guide and implementing the provided scripts and configurations, you can successfully upgrade your repository-gcs plugin while maintaining data integrity and optimizing performance.

Additional Resources