Aller au contenu principal

๐Ÿ“ก Communication entre Services

Architecture de communication et patterns utilisรฉs dans Commerce Tracking.

๐ŸŽฏ Vue d'ensembleโ€‹

Commerce Tracking utilise NATS comme message broker pour la communication asynchrone entre les microservices. Cette approche garantit la scalabilitรฉ, la rรฉsilience et la dรฉcouplage des services.

Avantages de NATSโ€‹

  • Performance : Trรจs faible latence (< 1ms)
  • Simplicitรฉ : API simple et intuitive
  • Scalabilitรฉ : Support de millions de messages/seconde
  • Haute disponibilitรฉ : Clustering et failover automatique
  • Flexibilitรฉ : Patterns multiples (pub/sub, req/reply, queue groups)

๐Ÿ—๏ธ Architecture de communicationโ€‹

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ CLIENT LAYER โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Web Client โ”‚ โ”‚ Mobile App โ”‚ โ”‚ API Client โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ”‚ HTTP/REST
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ API GATEWAY โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Gateway API (NestJS) โ”‚ โ”‚
โ”‚ โ”‚ โ€ข NATS Client Integration โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Request Transformation โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Response Aggregation โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Error Handling โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ”‚ NATS Messages
โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ NATS SERVER โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Message Broker โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Subject-based Routing โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Queue Groups โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Load Balancing โ”‚ โ”‚
โ”‚ โ”‚ โ€ข Message Persistence โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ โ”‚ โ”‚
โ–ผ โ–ผ โ–ผ
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚Auth Service โ”‚ โ”‚Admin Serviceโ”‚ โ”‚TradeFlow Svcโ”‚
โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ“จ Patterns de communicationโ€‹

1. Request-Response Patternโ€‹

Usage : Communication synchrone entre services

// Gateway โ†’ Auth Service
const response = await natsClient.send('auth.validate', {
token: 'jwt-token-here'
});

// Auth Service response
{
success: true,
result: {
isValid: true,
user: {
id: 123,
username: 'john.doe',
role_id: 4
}
}
}

Exemples d'utilisation :

  • Validation des tokens JWT
  • Rรฉcupรฉration des donnรฉes utilisateur
  • Validation des permissions
  • Requรชtes de donnรฉes de rรฉfรฉrence

2. Publish-Subscribe Patternโ€‹

Usage : Notifications et รฉvรฉnements asynchrones

// Publication d'un รฉvรฉnement
natsClient.emit('tradeflow.collection.created', {
collectionId: 123,
collectorId: 456,
timestamp: new Date().toISOString()
});

// Souscription ร  l'รฉvรฉnement
natsClient.subscribe('tradeflow.collection.created', (data) => {
console.log('Nouvelle collection crรฉรฉe:', data);
// Traitement asynchrone
});

Exemples d'utilisation :

  • Notifications de crรฉation/modification de collections
  • ร‰vรฉnements d'audit
  • Mises ร  jour de cache
  • Notifications systรจme

3. Queue Groups Patternโ€‹

Usage : Traitement distribuรฉ et load balancing

// Service A - Producteur
natsClient.emit('tradeflow.validation.queue', {
collectionId: 123,
validationLevel: 1
});

// Service B - Consommateur (Worker 1)
natsClient.subscribe('tradeflow.validation.queue',
{ queue: 'validation-workers' },
(data) => {
console.log('Worker 1 traite:', data);
}
);

// Service C - Consommateur (Worker 2)
natsClient.subscribe('tradeflow.validation.queue',
{ queue: 'validation-workers' },
(data) => {
console.log('Worker 2 traite:', data);
}
);

Exemples d'utilisation :

  • Traitement des validations
  • Gรฉnรฉration de rapports
  • Envoi d'emails
  • Traitement des fichiers

๐ŸŽฏ Sujets NATS (Subjects)โ€‹

Convention de nommageโ€‹

<service>.<module>.<action>

Sujets par serviceโ€‹

Auth Serviceโ€‹

'auth.login'              // Connexion utilisateur
'auth.logout' // Dรฉconnexion
'auth.validate' // Validation token
'auth.refresh' // Rafraรฎchissement token
'auth.profile' // Profil utilisateur
'auth.permissions' // Vรฉrification permissions

Admin Serviceโ€‹

'admin.users.create'      // Crรฉer utilisateur
'admin.users.update' // Modifier utilisateur
'admin.users.delete' // Supprimer utilisateur
'admin.users.list' // Lister utilisateurs
'admin.roles.manage' // Gestion des rรดles
'admin.actors.create' // Crรฉer acteur
'admin.actors.update' // Modifier acteur
'admin.config.update' // Configuration systรจme

TradeFlow Serviceโ€‹

'tradeflow.collections.create'     // Crรฉer collection
'tradeflow.collections.update' // Modifier collection
'tradeflow.collections.delete' // Supprimer collection
'tradeflow.collections.get' // Rรฉcupรฉrer collection
'tradeflow.collections.list' // Lister collections
'tradeflow.validation.submit' // Soumettre validation
'tradeflow.validation.approve' // Approuver validation
'tradeflow.validation.reject' // Rejeter validation
'tradeflow.reports.generate' // Gรฉnรฉrer rapport
'tradeflow.statistics.calculate' // Calculer statistiques

Model Serviceโ€‹

'model.definitions.get'    // Rรฉcupรฉrer dรฉfinitions
'model.definitions.create' // Crรฉer dรฉfinition
'model.types.list' // Lister types
'model.schemas.validate' // Valider schรฉma
'model.metadata.get' // Rรฉcupรฉrer mรฉtadonnรฉes

Common Dataโ€‹

'common.countries'         // Donnรฉes pays
'common.currencies' // Donnรฉes devises
'common.products' // Donnรฉes produits
'common.animals' // Donnรฉes animaux
'common.transport.modes' // Modes de transport
'common.geo.regions' // Rรฉgions gรฉographiques

๐Ÿ”„ Flux de communicationโ€‹

Flux d'authentificationโ€‹

sequenceDiagram
participant C as Client
participant G as Gateway
participant A as Auth Service
participant D as Database

C->>G: POST /auth/login {username, password}
G->>A: nats.send('auth.login', {username, password})
A->>D: SELECT user WHERE username = ?
D-->>A: User data
A->>A: bcrypt.compare(password, hash)
A->>A: Generate JWT token
A-->>G: {token, user, expires_in}
G-->>C: Authentication success

Flux de crรฉation de collectionโ€‹

sequenceDiagram
participant C as Client
participant G as Gateway
participant T as TradeFlow Service
participant A as Auth Service
participant D as Database

C->>G: POST /trade-flow/collections {data}
G->>A: nats.send('auth.validate', {token})
A-->>G: {isValid: true, user: {...}}
G->>T: nats.send('tradeflow.collections.create', {data, user})
T->>D: INSERT INTO collections ...
D-->>T: {id: 123, public_id: 'uuid'}
T-->>G: {success: true, result: collection}
T->>T: nats.emit('tradeflow.collection.created', {collectionId})
G-->>C: Collection created

Flux de validation multi-niveauxโ€‹

sequenceDiagram
participant C as Client
participant G as Gateway
participant T as TradeFlow Service
participant D as Database

C->>G: POST /trade-flow/collections/123/validate
G->>T: nats.send('tradeflow.validation.submit', {collectionId: 123})
T->>D: UPDATE collections SET status = 'pending_validation'
T->>T: nats.emit('tradeflow.validation.queue', {collectionId, level: 1})
T-->>G: {success: true, message: 'Submitted for validation'}
G-->>C: Validation submitted

Note over T: Validation Worker
T->>D: SELECT collection WHERE id = 123
T->>T: Process validation logic
T->>D: UPDATE collections SET status = 'validated'
T->>T: nats.emit('tradeflow.validation.completed', {collectionId})

๐Ÿ› ๏ธ Configuration NATSโ€‹

Configuration du clientโ€‹

// Configuration dans chaque service
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';

const natsClient = ClientProxyFactory.create({
transport: Transport.NATS,
options: {
servers: process.env.NATS_URLS?.split(',') || ['nats://localhost:4222'],
reconnect: true,
maxReconnectAttempts: 10,
reconnectTimeWait: 2000,
timeout: 5000,
maxPayload: 1024 * 1024, // 1MB
name: 'commerce-tracking-service',
user: process.env.NATS_USER,
pass: process.env.NATS_PASS,
},
});

Configuration du serveurโ€‹

# nats-server.conf
port: 4222
http_port: 8222

# Clustering (pour haute disponibilitรฉ)
cluster {
port: 6222
routes: [
"nats://nats-1:6222"
"nats://nats-2:6222"
"nats://nats-3:6222"
]
}

# Persistance
jetstream {
store_dir: "/data/jetstream"
max_memory_store: 2GB
max_file_store: 10GB
}

# Monitoring
monitor_port: 8222

๐Ÿ”ง Gestion des erreursโ€‹

Timeout et retryโ€‹

// Configuration du timeout
const response = await natsClient.send('auth.validate', { token }, {
timeout: 5000 // 5 secondes
});

// Gestion des erreurs avec retry
async function sendWithRetry(subject: string, data: any, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
return await natsClient.send(subject, data, { timeout: 5000 });
} catch (error) {
if (i === maxRetries - 1) throw error;
console.log(`Tentative ${i + 1} รฉchouรฉe, retry dans ${1000 * (i + 1)}ms`);
await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
}
}
}

Circuit breakerโ€‹

class CircuitBreaker {
private failures = 0;
private lastFailureTime = 0;
private readonly threshold = 5;
private readonly timeout = 60000; // 1 minute

async call(subject: string, data: any) {
if (this.isOpen()) {
throw new Error('Circuit breaker is open');
}

try {
const result = await natsClient.send(subject, data);
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}

private isOpen(): boolean {
return this.failures >= this.threshold &&
(Date.now() - this.lastFailureTime) < this.timeout;
}

private onSuccess() {
this.failures = 0;
}

private onFailure() {
this.failures++;
this.lastFailureTime = Date.now();
}
}

๐Ÿ“Š Monitoring et observabilitรฉโ€‹

Mรฉtriques NATSโ€‹

// Mรฉtriques de performance
const metrics = {
messagesSent: 0,
messagesReceived: 0,
errors: 0,
averageLatency: 0,
connections: 0
};

// Middleware de monitoring
natsClient.subscribe('*', (data, subject) => {
metrics.messagesReceived++;
console.log(`Message reรงu sur ${subject}:`, data);
});

Health checksโ€‹

// Health check NATS
async function checkNatsHealth(): Promise<boolean> {
try {
await natsClient.send('health.ping', {}, { timeout: 1000 });
return true;
} catch (error) {
console.error('NATS health check failed:', error);
return false;
}
}

Cette architecture de communication basรฉe sur NATS garantit la scalabilitรฉ, la rรฉsilience et la performance du systรจme Commerce Tracking. Les patterns utilisรฉs permettent une รฉvolution flexible et une maintenance simplifiรฉe.