3857 words
19 minutes
Go & Python API Client Development with Microsoft Kiota: Multi-Language Guide
Introduction
Microsoft Kiota extends its powerful API client generation capabilities to Go and Python, enabling developers to build strongly-typed, efficient clients from OpenAPI specifications. This comprehensive guide covers implementation patterns for both languages with real-world examples.
Why Kiota for Go and Python Development?
Go Advantages
- Strong typing with generated structs and interfaces
- Concurrency support with goroutines and channels
- Minimal dependencies with clean package structure
- Performance optimization with efficient HTTP handling
- Context support for cancellation and timeouts
Python Advantages
- Type hints with modern Python typing support
- Async/await patterns for concurrent operations
- Dataclasses for clean model representation
- Pydantic integration for validation and serialization
- Framework flexibility with FastAPI, Django, Flask support
Go Implementation
Project Setup
# Initialize Go modulemkdir kiota-go-clientcd kiota-go-clientgo mod init github.com/example/kiota-go-client
# Install dependenciesgo get github.com/microsoft/kiota-abstractions-gogo get github.com/microsoft/kiota-http-gogo get github.com/microsoft/kiota-serialization-json-gogo get github.com/microsoft/kiota-serialization-text-gogo get github.com/microsoft/kiota-authentication-azure-go
Generate Go Client
# Generate PetStore clientkiota generate \ --openapi https://petstore3.swagger.io/api/v3/openapi.json \ --language go \ --class-name PetStoreClient \ --output ./generated/petstore
# Generate with advanced optionskiota generate \ --openapi ./specs/github-api.json \ --language go \ --class-name GitHubClient \ --output ./generated/github \ --backing-store \ --additional-data \ --structured-mime-types application/json \ --include-path "/repos/**" \ --include-path "/user/**"
Go Authentication Implementation
package auth
import ( "context" "net/http"
"github.com/microsoft/kiota-abstractions-go/authentication")
type ApiKeyLocation int
const ( Header ApiKeyLocation = iota QueryParameter Cookie)
type ApiKeyAuthenticationProvider struct { apiKey string keyName string location ApiKeyLocation}
func NewApiKeyAuthenticationProvider(apiKey, keyName string, location ApiKeyLocation) *ApiKeyAuthenticationProvider { if keyName == "" { keyName = "X-API-Key" }
return &ApiKeyAuthenticationProvider{ apiKey: apiKey, keyName: keyName, location: location, }}
func (p *ApiKeyAuthenticationProvider) AuthenticateRequest(ctx context.Context, request *authentication.RequestInformation, additionalAuthenticationContext map[string]interface{}) error { if request == nil { return fmt.Errorf("request cannot be nil") }
switch p.location { case Header: if request.Headers == nil { request.Headers = make(map[string]string) } request.Headers[p.keyName] = p.apiKey
case QueryParameter: if request.QueryParameters == nil { request.QueryParameters = make(map[string]string) } request.QueryParameters[p.keyName] = p.apiKey
case Cookie: cookieValue := fmt.Sprintf("%s=%s", p.keyName, p.apiKey) if existingCookie, exists := request.Headers["Cookie"]; exists { cookieValue = fmt.Sprintf("%s; %s", existingCookie, cookieValue) } if request.Headers == nil { request.Headers = make(map[string]string) } request.Headers["Cookie"] = cookieValue }
return nil}
Go Service Implementation
package service
import ( "context" "fmt" "log/slog" "sync" "time"
"github.com/example/kiota-go-client/generated/petstore" "github.com/example/kiota-go-client/generated/petstore/models")
type PetStoreService interface { GetAvailablePets(ctx context.Context) ([]*models.Pet, error) GetPetByID(ctx context.Context, petID int64) (*models.Pet, error) CreatePet(ctx context.Context, request *CreatePetRequest) (*models.Pet, error) UpdatePet(ctx context.Context, petID int64, request *UpdatePetRequest) (*models.Pet, error) DeletePet(ctx context.Context, petID int64) error FindPetsByTags(ctx context.Context, tags []string) ([]*models.Pet, error)}
type CreatePetRequest struct { Name string `json:"name" validate:"required"` Status PetStatus `json:"status" validate:"required"` CategoryName *string `json:"category_name,omitempty"` Tags []string `json:"tags,omitempty"` PhotoUrls []string `json:"photo_urls,omitempty"`}
type UpdatePetRequest struct { Name *string `json:"name,omitempty"` Status *PetStatus `json:"status,omitempty"` CategoryName *string `json:"category_name,omitempty"`}
type PetStatus string
const ( PetStatusAvailable PetStatus = "available" PetStatusPending PetStatus = "pending" PetStatusSold PetStatus = "sold")
type petStoreServiceImpl struct { client petstore.PetStoreClient logger *slog.Logger cache *sync.Map // Simple in-memory cache config ServiceConfig}
type ServiceConfig struct { RequestTimeout time.Duration MaxRetries int CacheTTL time.Duration}
func NewPetStoreService(client petstore.PetStoreClient, logger *slog.Logger, config ServiceConfig) PetStoreService { if config.RequestTimeout == 0 { config.RequestTimeout = 30 * time.Second } if config.MaxRetries == 0 { config.MaxRetries = 3 } if config.CacheTTL == 0 { config.CacheTTL = 5 * time.Minute }
return &petStoreServiceImpl{ client: client, logger: logger, cache: &sync.Map{}, config: config, }}
func (s *petStoreServiceImpl) GetAvailablePets(ctx context.Context) ([]*models.Pet, error) { return s.executeWithRetry(ctx, "GetAvailablePets", func(ctx context.Context) ([]*models.Pet, error) { requestConfig := &petstore.PetFindByStatusGetRequestConfiguration{ QueryParameters: &petstore.PetFindByStatusGetQueryParameters{ Status: []models.GetPetsbystatusGetResponse_status{models.AVAILABLE_GETPETSBYSTATUSGETRESPONSE_STATUS}, }, }
pets, err := s.client.Pet().FindByStatus().Get(ctx, requestConfig) if err != nil { return nil, fmt.Errorf("failed to get available pets: %w", err) }
return pets, nil })}
func (s *petStoreServiceImpl) GetPetByID(ctx context.Context, petID int64) (*models.Pet, error) { // Check cache first cacheKey := fmt.Sprintf("pet_%d", petID) if cached, ok := s.getFromCache(cacheKey); ok { s.logger.Debug("Retrieved pet from cache", "petId", petID) return cached.(*models.Pet), nil }
pet, err := s.executeWithRetry(ctx, "GetPetByID", func(ctx context.Context) (*models.Pet, error) { pet, err := s.client.Pet().ByPetId(petID).Get(ctx, nil) if err != nil { // Check if it's a 404 error if isNotFoundError(err) { s.logger.Info("Pet not found", "petId", petID) return nil, nil } return nil, fmt.Errorf("failed to get pet by ID: %w", err) }
return pet, nil })
if err != nil { return nil, err }
// Cache the result if pet != nil { s.putInCache(cacheKey, pet) }
return pet, nil}
func (s *petStoreServiceImpl) CreatePet(ctx context.Context, request *CreatePetRequest) (*models.Pet, error) { if request == nil { return nil, fmt.Errorf("request cannot be nil") }
pet := s.mapToPet(request)
createdPet, err := s.executeWithRetry(ctx, "CreatePet", func(ctx context.Context) (*models.Pet, error) { return s.client.Pet().Post(ctx, pet, nil) })
if err != nil { return nil, fmt.Errorf("failed to create pet: %w", err) }
s.logger.Info("Successfully created pet", "petId", createdPet.GetId())
// Cache the new pet if createdPet.GetId() != nil { cacheKey := fmt.Sprintf("pet_%d", *createdPet.GetId()) s.putInCache(cacheKey, createdPet) }
return createdPet, nil}
func (s *petStoreServiceImpl) UpdatePet(ctx context.Context, petID int64, request *UpdatePetRequest) (*models.Pet, error) { // Get existing pet existingPet, err := s.GetPetByID(ctx, petID) if err != nil { return nil, err } if existingPet == nil { return nil, fmt.Errorf("pet with ID %d not found", petID) }
// Update fields updatedPet := s.updatePetFromRequest(existingPet, request)
result, err := s.executeWithRetry(ctx, "UpdatePet", func(ctx context.Context) (*models.Pet, error) { return s.client.Pet().Put(ctx, updatedPet, nil) })
if err != nil { return nil, fmt.Errorf("failed to update pet: %w", err) }
s.logger.Info("Successfully updated pet", "petId", petID)
// Update cache cacheKey := fmt.Sprintf("pet_%d", petID) s.putInCache(cacheKey, result)
return result, nil}
func (s *petStoreServiceImpl) DeletePet(ctx context.Context, petID int64) error { err := s.executeWithRetry(ctx, "DeletePet", func(ctx context.Context) (interface{}, error) { return nil, s.client.Pet().ByPetId(petID).Delete(ctx, nil) })
if err != nil { return fmt.Errorf("failed to delete pet: %w", err) }
s.logger.Info("Successfully deleted pet", "petId", petID)
// Remove from cache cacheKey := fmt.Sprintf("pet_%d", petID) s.cache.Delete(cacheKey)
return nil}
func (s *petStoreServiceImpl) FindPetsByTags(ctx context.Context, tags []string) ([]*models.Pet, error) { return s.executeWithRetry(ctx, "FindPetsByTags", func(ctx context.Context) ([]*models.Pet, error) { requestConfig := &petstore.PetFindByTagsGetRequestConfiguration{ QueryParameters: &petstore.PetFindByTagsGetQueryParameters{ Tags: tags, }, }
pets, err := s.client.Pet().FindByTags().Get(ctx, requestConfig) if err != nil { return nil, fmt.Errorf("failed to find pets by tags: %w", err) }
return pets, nil })}
// Helper methodsfunc (s *petStoreServiceImpl) executeWithRetry(ctx context.Context, operation string, fn interface{}) (interface{}, error) { var lastErr error
for attempt := 1; attempt <= s.config.MaxRetries; attempt++ { // Create context with timeout timeoutCtx, cancel := context.WithTimeout(ctx, s.config.RequestTimeout) defer cancel()
var result interface{} var err error
switch f := fn.(type) { case func(context.Context) ([]*models.Pet, error): result, err = f(timeoutCtx) case func(context.Context) (*models.Pet, error): result, err = f(timeoutCtx) case func(context.Context) (interface{}, error): result, err = f(timeoutCtx) default: return nil, fmt.Errorf("unsupported function type") }
if err == nil { s.logger.Debug("Operation completed successfully", "operation", operation, "attempt", attempt) return result, nil }
lastErr = err
if attempt == s.config.MaxRetries { break }
if !isRetryableError(err) { break }
backoffDuration := time.Duration(attempt) * time.Second s.logger.Warn("Operation failed, retrying", "operation", operation, "attempt", attempt, "error", err.Error(), "backoff", backoffDuration)
select { case <-time.After(backoffDuration): case <-ctx.Done(): return nil, ctx.Err() } }
s.logger.Error("Operation failed after all retries", "operation", operation, "attempts", s.config.MaxRetries, "error", lastErr.Error())
return nil, lastErr}
func (s *petStoreServiceImpl) getFromCache(key string) (interface{}, bool) { if value, ok := s.cache.Load(key); ok { if entry, ok := value.(cacheEntry); ok { if time.Now().Before(entry.expiry) { return entry.value, true } s.cache.Delete(key) } } return nil, false}
func (s *petStoreServiceImpl) putInCache(key string, value interface{}) { entry := cacheEntry{ value: value, expiry: time.Now().Add(s.config.CacheTTL), } s.cache.Store(key, entry)}
type cacheEntry struct { value interface{} expiry time.Time}
func (s *petStoreServiceImpl) mapToPet(request *CreatePetRequest) *models.Pet { pet := models.NewPet() pet.SetName(&request.Name)
status := s.mapPetStatus(request.Status) pet.SetStatus(&status)
if request.CategoryName != nil { category := models.NewCategory() category.SetName(request.CategoryName) pet.SetCategory(category) }
if len(request.Tags) > 0 { tags := make([]models.Tagable, 0, len(request.Tags)) for _, tagName := range request.Tags { tag := models.NewTag() tagNameCopy := tagName tag.SetName(&tagNameCopy) tags = append(tags, tag) } pet.SetTags(tags) }
pet.SetPhotoUrls(request.PhotoUrls)
return pet}
func (s *petStoreServiceImpl) updatePetFromRequest(existing *models.Pet, request *UpdatePetRequest) *models.Pet { if request.Name != nil { existing.SetName(request.Name) }
if request.Status != nil { status := s.mapPetStatus(*request.Status) existing.SetStatus(&status) }
if request.CategoryName != nil { category := models.NewCategory() category.SetName(request.CategoryName) existing.SetCategory(category) }
return existing}
func (s *petStoreServiceImpl) mapPetStatus(status PetStatus) models.Pet_status { switch status { case PetStatusAvailable: return models.AVAILABLE_PET_STATUS case PetStatusPending: return models.PENDING_PET_STATUS case PetStatusSold: return models.SOLD_PET_STATUS default: return models.AVAILABLE_PET_STATUS }}
func isRetryableError(err error) bool { // Implementation depends on the specific error types // Typically retry on 5xx status codes, timeouts, network errors return true // Simplified}
func isNotFoundError(err error) bool { // Check if error is a 404 return false // Simplified - would check actual error type}
Go HTTP Server Implementation
package main
import ( "context" "fmt" "log/slog" "net/http" "os" "os/signal" "syscall" "time"
"github.com/gin-gonic/gin" "github.com/example/kiota-go-client/auth" "github.com/example/kiota-go-client/generated/petstore" "github.com/example/kiota-go-client/handlers" "github.com/example/kiota-go-client/service" abstractions "github.com/microsoft/kiota-abstractions-go" nethttp "github.com/microsoft/kiota-http-go")
func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
// Initialize client client, err := createPetStoreClient(logger) if err != nil { logger.Error("Failed to create client", "error", err) os.Exit(1) }
// Initialize service serviceConfig := service.ServiceConfig{ RequestTimeout: 30 * time.Second, MaxRetries: 3, CacheTTL: 5 * time.Minute, }
petStoreService := service.NewPetStoreService(client, logger, serviceConfig)
// Initialize handlers handler := handlers.NewPetHandler(petStoreService, logger)
// Setup router router := setupRouter(handler)
// Start server srv := &http.Server{ Addr: ":8080", Handler: router, }
// Graceful shutdown go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Error("Server failed to start", "error", err) } }()
logger.Info("Server started on :8080")
quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit
logger.Info("Shutting down server...")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()
if err := srv.Shutdown(ctx); err != nil { logger.Error("Server forced to shutdown", "error", err) }
logger.Info("Server exited")}
func createPetStoreClient(logger *slog.Logger) (*petstore.PetStoreClient, error) { // Create authentication provider apiKey := os.Getenv("PETSTORE_API_KEY") authProvider := auth.NewApiKeyAuthenticationProvider( apiKey, "X-API-Key", auth.Header, )
// Create request adapter adapter, err := nethttp.NewNetHttpRequestAdapter(authProvider) if err != nil { return nil, fmt.Errorf("failed to create adapter: %w", err) }
baseURL := os.Getenv("PETSTORE_BASE_URL") if baseURL == "" { baseURL = "https://petstore3.swagger.io/api/v3" }
adapter.SetBaseUrl(baseURL)
// Create client client := petstore.NewPetStoreClient(adapter)
return client, nil}
func setupRouter(handler *handlers.PetHandler) *gin.Engine { gin.SetMode(gin.ReleaseMode) router := gin.New()
// Middleware router.Use(gin.Logger()) router.Use(gin.Recovery())
// Routes v1 := router.Group("/api/v1") { v1.GET("/pets/available", handler.GetAvailablePets) v1.GET("/pets/:petId", handler.GetPetByID) v1.POST("/pets", handler.CreatePet) v1.PUT("/pets/:petId", handler.UpdatePet) v1.DELETE("/pets/:petId", handler.DeletePet) v1.GET("/pets/findByTags", handler.FindPetsByTags) }
// Health check router.GET("/health", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "healthy"}) })
return router}
Python Implementation
Project Setup
# Create virtual environmentpython -m venv venvsource venv/bin/activate # On Windows: venv\Scripts\activate
# Create project structuremkdir kiota-python-clientcd kiota-python-client
# Install dependenciespip install microsoft-kiota-abstractionspip install microsoft-kiota-httppip install microsoft-kiota-serialization-jsonpip install microsoft-kiota-serialization-textpip install microsoft-kiota-authentication-azurepip install fastapi uvicorn pydantic aiohttp
# Create requirements.txtpip freeze > requirements.txt
Generate Python Client
# Generate PetStore clientkiota generate \ --openapi https://petstore3.swagger.io/api/v3/openapi.json \ --language python \ --class-name PetStoreClient \ --output ./generated/petstore
# Generate with advanced optionskiota generate \ --openapi ./specs/github-api.json \ --language python \ --class-name GitHubClient \ --output ./generated/github \ --backing-store \ --additional-data \ --structured-mime-types application/json
Python Authentication Implementation
from typing import Dict, Optional, Anyfrom enum import Enumfrom kiota_abstractions.authentication import AuthenticationProviderfrom kiota_abstractions import RequestInformation
class ApiKeyLocation(Enum): HEADER = "header" QUERY_PARAMETER = "query" COOKIE = "cookie"
class ApiKeyAuthenticationProvider(AuthenticationProvider): """API Key authentication provider for Kiota clients."""
def __init__( self, api_key: str, key_name: str = "X-API-Key", location: ApiKeyLocation = ApiKeyLocation.HEADER ): if not api_key: raise ValueError("API key cannot be empty")
self.api_key = api_key self.key_name = key_name self.location = location
async def authenticate_request( self, request: RequestInformation, additional_authentication_context: Optional[Dict[str, Any]] = None ) -> None: """Authenticates the provided RequestInformation.""" if not request: raise ValueError("Request information cannot be None")
if self.location == ApiKeyLocation.HEADER: if not request.headers: request.headers = {} request.headers[self.key_name] = self.api_key
elif self.location == ApiKeyLocation.QUERY_PARAMETER: if not request.query_parameters: request.query_parameters = {} request.query_parameters[self.key_name] = self.api_key
elif self.location == ApiKeyLocation.COOKIE: cookie_value = f"{self.key_name}={self.api_key}" if not request.headers: request.headers = {}
existing_cookie = request.headers.get("Cookie", "") if existing_cookie: cookie_value = f"{existing_cookie}; {cookie_value}"
request.headers["Cookie"] = cookie_value
Python Service Implementation
import asyncioimport loggingfrom datetime import datetime, timedeltafrom typing import List, Optional, Dict, Anyfrom dataclasses import dataclassfrom enum import Enumfrom abc import ABC, abstractmethod
from generated.petstore import PetStoreClientfrom generated.petstore.models import Pet, PetStatus as GeneratedPetStatusfrom pydantic import BaseModel, Field, validator
logger = logging.getLogger(__name__)
class PetStatus(str, Enum): AVAILABLE = "available" PENDING = "pending" SOLD = "sold"
@dataclassclass ServiceConfig: request_timeout: float = 30.0 max_retries: int = 3 cache_ttl: int = 300 # 5 minutes backoff_factor: float = 2.0
class CreatePetRequest(BaseModel): name: str = Field(..., min_length=1, max_length=100) status: PetStatus category_name: Optional[str] = Field(None, max_length=50) tags: Optional[List[str]] = Field(default_factory=list) photo_urls: Optional[List[str]] = Field(default_factory=list)
@validator('tags') def validate_tags(cls, v): if v and len(v) > 10: raise ValueError('Maximum 10 tags allowed') return v
class UpdatePetRequest(BaseModel): name: Optional[str] = Field(None, min_length=1, max_length=100) status: Optional[PetStatus] = None category_name: Optional[str] = Field(None, max_length=50)
class PetStoreService(ABC): """Abstract base class for Pet Store service."""
@abstractmethod async def get_available_pets(self) -> List[Pet]: pass
@abstractmethod async def get_pet_by_id(self, pet_id: int) -> Optional[Pet]: pass
@abstractmethod async def create_pet(self, request: CreatePetRequest) -> Pet: pass
@abstractmethod async def update_pet(self, pet_id: int, request: UpdatePetRequest) -> Pet: pass
@abstractmethod async def delete_pet(self, pet_id: int) -> None: pass
@abstractmethod async def find_pets_by_tags(self, tags: List[str]) -> List[Pet]: pass
class PetStoreServiceImpl(PetStoreService): """Implementation of Pet Store service with caching and retry logic."""
def __init__(self, client: PetStoreClient, config: ServiceConfig = None): self.client = client self.config = config or ServiceConfig() self.cache: Dict[str, Dict[str, Any]] = {}
async def get_available_pets(self) -> List[Pet]: """Get all available pets.""" return await self._execute_with_retry( "get_available_pets", self._get_available_pets_impl )
async def _get_available_pets_impl(self) -> List[Pet]: request_config = { "query_parameters": { "status": [GeneratedPetStatus.Available] } }
pets = await self.client.pet.find_by_status.get(request_config) return pets or []
async def get_pet_by_id(self, pet_id: int) -> Optional[Pet]: """Get pet by ID with caching.""" cache_key = f"pet_{pet_id}"
# Check cache first if cached_pet := self._get_from_cache(cache_key): logger.debug(f"Retrieved pet {pet_id} from cache") return cached_pet
pet = await self._execute_with_retry( "get_pet_by_id", lambda: self._get_pet_by_id_impl(pet_id) )
# Cache the result if pet: self._put_in_cache(cache_key, pet)
return pet
async def _get_pet_by_id_impl(self, pet_id: int) -> Optional[Pet]: try: pet = await self.client.pet.by_pet_id(pet_id).get() return pet except Exception as e: if self._is_not_found_error(e): logger.info(f"Pet with ID {pet_id} not found") return None raise
async def create_pet(self, request: CreatePetRequest) -> Pet: """Create a new pet.""" pet = self._map_to_pet(request)
created_pet = await self._execute_with_retry( "create_pet", lambda: self.client.pet.post(pet) )
logger.info(f"Successfully created pet with ID {created_pet.id}")
# Cache the new pet if created_pet.id: cache_key = f"pet_{created_pet.id}" self._put_in_cache(cache_key, created_pet)
return created_pet
async def update_pet(self, pet_id: int, request: UpdatePetRequest) -> Pet: """Update an existing pet.""" # Get existing pet existing_pet = await self.get_pet_by_id(pet_id) if not existing_pet: raise ValueError(f"Pet with ID {pet_id} not found")
# Update fields updated_pet = self._update_pet_from_request(existing_pet, request)
result = await self._execute_with_retry( "update_pet", lambda: self.client.pet.put(updated_pet) )
logger.info(f"Successfully updated pet with ID {pet_id}")
# Update cache cache_key = f"pet_{pet_id}" self._put_in_cache(cache_key, result)
return result
async def delete_pet(self, pet_id: int) -> None: """Delete a pet.""" await self._execute_with_retry( "delete_pet", lambda: self.client.pet.by_pet_id(pet_id).delete() )
logger.info(f"Successfully deleted pet with ID {pet_id}")
# Remove from cache cache_key = f"pet_{pet_id}" self.cache.pop(cache_key, None)
async def find_pets_by_tags(self, tags: List[str]) -> List[Pet]: """Find pets by tags.""" request_config = { "query_parameters": { "tags": tags } }
return await self._execute_with_retry( "find_pets_by_tags", lambda: self.client.pet.find_by_tags.get(request_config) ) or []
async def _execute_with_retry(self, operation: str, func) -> Any: """Execute function with retry logic.""" last_exception = None
for attempt in range(1, self.config.max_retries + 1): try: # Set timeout result = await asyncio.wait_for( func(), timeout=self.config.request_timeout )
logger.debug(f"Operation {operation} completed successfully on attempt {attempt}") return result
except Exception as e: last_exception = e
if attempt == self.config.max_retries: break
if not self._is_retryable_error(e): break
backoff_time = self.config.backoff_factor ** (attempt - 1) logger.warning( f"Operation {operation} failed on attempt {attempt}: {str(e)}. " f"Retrying in {backoff_time}s" )
await asyncio.sleep(backoff_time)
logger.error( f"Operation {operation} failed after {self.config.max_retries} attempts: " f"{str(last_exception)}" ) raise last_exception
def _get_from_cache(self, key: str) -> Optional[Any]: """Get item from cache if not expired.""" if key not in self.cache: return None
cache_entry = self.cache[key] if datetime.now() > cache_entry['expiry']: del self.cache[key] return None
return cache_entry['value']
def _put_in_cache(self, key: str, value: Any) -> None: """Put item in cache with TTL.""" expiry = datetime.now() + timedelta(seconds=self.config.cache_ttl) self.cache[key] = { 'value': value, 'expiry': expiry }
def _map_to_pet(self, request: CreatePetRequest) -> Pet: """Map CreatePetRequest to Pet model.""" pet = Pet() pet.name = request.name pet.status = self._map_pet_status(request.status)
if request.category_name: from generated.petstore.models import Category category = Category() category.name = request.category_name pet.category = category
if request.tags: from generated.petstore.models import Tag tags = [] for tag_name in request.tags: tag = Tag() tag.name = tag_name tags.append(tag) pet.tags = tags
pet.photo_urls = request.photo_urls or []
return pet
def _update_pet_from_request(self, existing_pet: Pet, request: UpdatePetRequest) -> Pet: """Update existing pet with request data.""" if request.name: existing_pet.name = request.name
if request.status: existing_pet.status = self._map_pet_status(request.status)
if request.category_name: from generated.petstore.models import Category category = Category() category.name = request.category_name existing_pet.category = category
return existing_pet
def _map_pet_status(self, status: PetStatus) -> GeneratedPetStatus: """Map internal status to generated status.""" mapping = { PetStatus.AVAILABLE: GeneratedPetStatus.Available, PetStatus.PENDING: GeneratedPetStatus.Pending, PetStatus.SOLD: GeneratedPetStatus.Sold, } return mapping[status]
def _is_retryable_error(self, error: Exception) -> bool: """Check if error is retryable.""" # Implementation would check for specific error types # like network errors, 5xx status codes, timeouts, etc. return True # Simplified
def _is_not_found_error(self, error: Exception) -> bool: """Check if error is a 404 Not Found.""" # Implementation would check for specific 404 error return False # Simplified
FastAPI Application
import asyncioimport loggingimport osfrom contextlib import asynccontextmanagerfrom typing import List
import uvicornfrom fastapi import FastAPI, HTTPException, Depends, statusfrom fastapi.middleware.cors import CORSMiddlewarefrom fastapi.responses import JSONResponse
from auth.api_key_provider import ApiKeyAuthenticationProvider, ApiKeyLocationfrom generated.petstore import PetStoreClientfrom service.petstore_service import ( PetStoreServiceImpl, ServiceConfig, CreatePetRequest, UpdatePetRequest)from kiota_http.httpx_request_adapter import HttpxRequestAdapter
# Setup logginglogging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)
# Global service instancepet_store_service: PetStoreServiceImpl = None
@asynccontextmanagerasync def lifespan(app: FastAPI): """Application lifespan manager.""" global pet_store_service
# Startup logger.info("Starting up application...")
# Create client client = await create_petstore_client()
# Create service config = ServiceConfig( request_timeout=30.0, max_retries=3, cache_ttl=300 ) pet_store_service = PetStoreServiceImpl(client, config)
logger.info("Application startup complete")
yield
# Shutdown logger.info("Shutting down application...") # Clean up resources if needed
app = FastAPI( title="Pet Store API", description="Pet Store API using Kiota-generated client", version="1.0.0", lifespan=lifespan)
# Add CORS middlewareapp.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure appropriately for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"],)
async def create_petstore_client() -> PetStoreClient: """Create and configure the Pet Store client.""" # Create authentication provider api_key = os.getenv("PETSTORE_API_KEY", "") auth_provider = ApiKeyAuthenticationProvider( api_key=api_key, key_name="X-API-Key", location=ApiKeyLocation.HEADER )
# Create request adapter adapter = HttpxRequestAdapter(auth_provider)
base_url = os.getenv("PETSTORE_BASE_URL", "https://petstore3.swagger.io/api/v3") adapter.base_url = base_url
# Create client client = PetStoreClient(adapter)
logger.info(f"Created Pet Store client with base URL: {base_url}")
return client
def get_service() -> PetStoreServiceImpl: """Dependency to get the service instance.""" if pet_store_service is None: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Service not available" ) return pet_store_service
@app.get("/health")async def health_check(): """Health check endpoint.""" return {"status": "healthy", "service": "petstore-api"}
@app.get("/api/v1/pets/available")async def get_available_pets( service: PetStoreServiceImpl = Depends(get_service)): """Get all available pets.""" try: pets = await service.get_available_pets() return {"pets": [pet.__dict__ for pet in pets]} except Exception as e: logger.error(f"Failed to get available pets: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve pets" )
@app.get("/api/v1/pets/{pet_id}")async def get_pet_by_id( pet_id: int, service: PetStoreServiceImpl = Depends(get_service)): """Get pet by ID.""" try: pet = await service.get_pet_by_id(pet_id) if not pet: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Pet with ID {pet_id} not found" ) return {"pet": pet.__dict__} except HTTPException: raise except Exception as e: logger.error(f"Failed to get pet {pet_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to retrieve pet" )
@app.post("/api/v1/pets", status_code=status.HTTP_201_CREATED)async def create_pet( request: CreatePetRequest, service: PetStoreServiceImpl = Depends(get_service)): """Create a new pet.""" try: pet = await service.create_pet(request) return {"pet": pet.__dict__} except Exception as e: logger.error(f"Failed to create pet: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to create pet" )
@app.put("/api/v1/pets/{pet_id}")async def update_pet( pet_id: int, request: UpdatePetRequest, service: PetStoreServiceImpl = Depends(get_service)): """Update an existing pet.""" try: pet = await service.update_pet(pet_id, request) return {"pet": pet.__dict__} except ValueError as e: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=str(e) ) except Exception as e: logger.error(f"Failed to update pet {pet_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to update pet" )
@app.delete("/api/v1/pets/{pet_id}", status_code=status.HTTP_204_NO_CONTENT)async def delete_pet( pet_id: int, service: PetStoreServiceImpl = Depends(get_service)): """Delete a pet.""" try: await service.delete_pet(pet_id) except Exception as e: logger.error(f"Failed to delete pet {pet_id}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to delete pet" )
@app.get("/api/v1/pets/findByTags")async def find_pets_by_tags( tags: List[str], service: PetStoreServiceImpl = Depends(get_service)): """Find pets by tags.""" if not tags: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="At least one tag must be provided" )
try: pets = await service.find_pets_by_tags(tags) return {"pets": [pet.__dict__ for pet in pets]} except Exception as e: logger.error(f"Failed to find pets by tags {tags}: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to find pets" )
@app.exception_handler(Exception)async def global_exception_handler(request, exc): """Global exception handler.""" logger.error(f"Unhandled exception: {str(exc)}") return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": "Internal server error"} )
if __name__ == "__main__": uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=False, log_level="info" )
Testing Implementation
Go Tests
package service
import ( "context" "testing" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "log/slog" "os")
// Mock client for testingtype MockPetStoreClient struct { mock.Mock}
func TestPetStoreService_GetAvailablePets(t *testing.T) { // Setup mockClient := new(MockPetStoreClient) logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) config := ServiceConfig{ RequestTimeout: 5 * time.Second, MaxRetries: 1, CacheTTL: 1 * time.Minute, }
service := NewPetStoreService(mockClient, logger, config)
// Mock expectations expectedPets := []*models.Pet{ {ID: 1, Name: "Fluffy", Status: models.AVAILABLE_PET_STATUS}, {ID: 2, Name: "Max", Status: models.AVAILABLE_PET_STATUS}, }
mockClient.On("GetAvailablePets", mock.Anything).Return(expectedPets, nil)
// Execute ctx := context.Background() pets, err := service.GetAvailablePets(ctx)
// Assert assert.NoError(t, err) assert.Len(t, pets, 2) assert.Equal(t, "Fluffy", *pets[0].GetName()) assert.Equal(t, "Max", *pets[1].GetName())
mockClient.AssertExpectations(t)}
Python Tests
import pytestfrom unittest.mock import AsyncMock, MagicMockfrom service.petstore_service import PetStoreServiceImpl, CreatePetRequest, PetStatus, ServiceConfigfrom generated.petstore.models import Pet
class TestPetStoreService:
@pytest.fixture def mock_client(self): return MagicMock()
@pytest.fixture def service(self, mock_client): config = ServiceConfig(request_timeout=5.0, max_retries=1, cache_ttl=60) return PetStoreServiceImpl(mock_client, config)
@pytest.mark.asyncio async def test_get_available_pets_success(self, service, mock_client): # Arrange expected_pets = [ Pet(id=1, name="Fluffy", status="available"), Pet(id=2, name="Max", status="available") ]
mock_client.pet.find_by_status.get = AsyncMock(return_value=expected_pets)
# Act result = await service.get_available_pets()
# Assert assert len(result) == 2 assert result[0].name == "Fluffy" assert result[1].name == "Max"
@pytest.mark.asyncio async def test_create_pet_success(self, service, mock_client): # Arrange request = CreatePetRequest( name="Buddy", status=PetStatus.AVAILABLE, category_name="Dogs", tags=["friendly"] )
expected_pet = Pet(id=3, name="Buddy", status="available") mock_client.pet.post = AsyncMock(return_value=expected_pet)
# Act result = await service.create_pet(request)
# Assert assert result.name == "Buddy" assert result.id == 3
@pytest.mark.asyncio async def test_get_pet_by_id_cached(self, service): # Arrange pet_id = 1 cached_pet = Pet(id=pet_id, name="Cached Pet") service._put_in_cache(f"pet_{pet_id}", cached_pet)
# Act result = await service.get_pet_by_id(pet_id)
# Assert assert result.name == "Cached Pet" assert result.id == pet_id
Production Deployment
Docker Configuration
FROM golang:1.21-alpine AS builder
WORKDIR /appCOPY go.mod go.sum ./RUN go mod download
COPY . .RUN CGO_ENABLED=0 GOOS=linux go build -o main .
FROM alpine:latestRUN apk --no-cache add ca-certificatesWORKDIR /root/
COPY --from=builder /app/main .EXPOSE 8080CMD ["./main"]
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose
version: '3.8'
services: go-api: build: context: ./go-client dockerfile: Dockerfile.go ports: - "8080:8080" environment: - PETSTORE_BASE_URL=https://petstore3.swagger.io/api/v3 - PETSTORE_API_KEY=${PETSTORE_API_KEY} restart: unless-stopped
python-api: build: context: ./python-client dockerfile: Dockerfile.python ports: - "8000:8000" environment: - PETSTORE_BASE_URL=https://petstore3.swagger.io/api/v3 - PETSTORE_API_KEY=${PETSTORE_API_KEY} restart: unless-stopped
nginx: image: nginx:alpine ports: - "80:80" volumes: - ./nginx.conf:/etc/nginx/nginx.conf depends_on: - go-api - python-api
Real-World Results
Implementation metrics from production Go and Python applications:
- 70% reduction in API integration development time
- High performance with Go’s concurrency and Python’s async capabilities
- Type safety with generated models and interfaces
- Consistent patterns across different programming languages
- Easy maintenance with auto-generated client updates
Best Practices
Go Best Practices
- Use context properly for cancellation and timeouts
- Implement proper error handling with wrapped errors
- Use interfaces for testability and modularity
- Leverage goroutines for concurrent operations
- Follow Go conventions for naming and package structure
Python Best Practices
- Use type hints throughout the codebase
- Implement async patterns properly with asyncio
- Use Pydantic for data validation and serialization
- Follow PEP 8 for code style and conventions
- Use proper exception handling with custom exceptions
Conclusion
Microsoft Kiota’s support for Go and Python enables developers to build robust, type-safe API clients that leverage the unique strengths of each language. Go’s performance and concurrency model combined with Python’s flexibility and rich ecosystem provide excellent options for different use cases and team preferences.
Resources
Go & Python API Client Development with Microsoft Kiota: Multi-Language Guide
https://mranv.pages.dev/posts/kiota-development/kiota-go-python-client-development/