๐ก Communication entre Services
Architecture de communication et patterns utilisรฉs dans Commerce Tracking.
๐ฏ Vue d'ensembleโ
Commerce Tracking utilise NATS comme message broker pour la communication asynchrone entre les microservices. Cette approche garantit la scalabilitรฉ, la rรฉsilience et la dรฉcouplage des services.
Avantages de NATSโ
- Performance : Trรจs faible latence (< 1ms)
- Simplicitรฉ : API simple et intuitive
- Scalabilitรฉ : Support de millions de messages/seconde
- Haute disponibilitรฉ : Clustering et failover automatique
- Flexibilitรฉ : Patterns multiples (pub/sub, req/reply, queue groups)
๐๏ธ Architecture de communicationโ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ CLIENT LAYER โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โ โ Web Client โ โ Mobile App โ โ API Client โ โ
โ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โ HTTP/REST
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ API GATEWAY โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Gateway API (NestJS) โ โ
โ โ โข NATS Client Integration โ โ
โ โ โข Request Transformation โ โ
โ โ โข Response Aggregation โ โ
โ โ โข Error Handling โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โ NATS Messages
โผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ NATS SERVER โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Message Broker โ โ
โ โ โข Subject-based Routing โ โ
โ โ โข Queue Groups โ โ
โ โ โข Load Balancing โ โ
โ โ โข Message Persistence โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโโผโโโโโโโโโโโโ
โ โ โ
โผ โผ โผ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โAuth Service โ โAdmin Serviceโ โTradeFlow Svcโ
โ โ โ โ โ โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
๐จ Patterns de communicationโ
1. Request-Response Patternโ
Usage : Communication synchrone entre services
// Gateway โ Auth Service
const response = await natsClient.send('auth.validate', {
token: 'jwt-token-here'
});
// Auth Service response
{
success: true,
result: {
isValid: true,
user: {
id: 123,
username: 'john.doe',
role_id: 4
}
}
}
Exemples d'utilisation :
- Validation des tokens JWT
- Rรฉcupรฉration des donnรฉes utilisateur
- Validation des permissions
- Requรชtes de donnรฉes de rรฉfรฉrence
2. Publish-Subscribe Patternโ
Usage : Notifications et รฉvรฉnements asynchrones
// Publication d'un รฉvรฉnement
natsClient.emit('tradeflow.collection.created', {
collectionId: 123,
collectorId: 456,
timestamp: new Date().toISOString()
});
// Souscription ร l'รฉvรฉnement
natsClient.subscribe('tradeflow.collection.created', (data) => {
console.log('Nouvelle collection crรฉรฉe:', data);
// Traitement asynchrone
});
Exemples d'utilisation :
- Notifications de crรฉation/modification de collections
- รvรฉnements d'audit
- Mises ร jour de cache
- Notifications systรจme
3. Queue Groups Patternโ
Usage : Traitement distribuรฉ et load balancing
// Service A - Producteur
natsClient.emit('tradeflow.validation.queue', {
collectionId: 123,
validationLevel: 1
});
// Service B - Consommateur (Worker 1)
natsClient.subscribe('tradeflow.validation.queue',
{ queue: 'validation-workers' },
(data) => {
console.log('Worker 1 traite:', data);
}
);
// Service C - Consommateur (Worker 2)
natsClient.subscribe('tradeflow.validation.queue',
{ queue: 'validation-workers' },
(data) => {
console.log('Worker 2 traite:', data);
}
);
Exemples d'utilisation :
- Traitement des validations
- Gรฉnรฉration de rapports
- Envoi d'emails
- Traitement des fichiers
๐ฏ Sujets NATS (Subjects)โ
Convention de nommageโ
<service>.<module>.<action>
Sujets par serviceโ
Auth Serviceโ
'auth.login' // Connexion utilisateur
'auth.logout' // Dรฉconnexion
'auth.validate' // Validation token
'auth.refresh' // Rafraรฎchissement token
'auth.profile' // Profil utilisateur
'auth.permissions' // Vรฉrification permissions
Admin Serviceโ
'admin.users.create' // Crรฉer utilisateur
'admin.users.update' // Modifier utilisateur
'admin.users.delete' // Supprimer utilisateur
'admin.users.list' // Lister utilisateurs
'admin.roles.manage' // Gestion des rรดles
'admin.actors.create' // Crรฉer acteur
'admin.actors.update' // Modifier acteur
'admin.config.update' // Configuration systรจme
TradeFlow Serviceโ
'tradeflow.collections.create' // Crรฉer collection
'tradeflow.collections.update' // Modifier collection
'tradeflow.collections.delete' // Supprimer collection
'tradeflow.collections.get' // Rรฉcupรฉrer collection
'tradeflow.collections.list' // Lister collections
'tradeflow.validation.submit' // Soumettre validation
'tradeflow.validation.approve' // Approuver validation
'tradeflow.validation.reject' // Rejeter validation
'tradeflow.reports.generate' // Gรฉnรฉrer rapport
'tradeflow.statistics.calculate' // Calculer statistiques
Model Serviceโ
'model.definitions.get' // Rรฉcupรฉrer dรฉfinitions
'model.definitions.create' // Crรฉer dรฉfinition
'model.types.list' // Lister types
'model.schemas.validate' // Valider schรฉma
'model.metadata.get' // Rรฉcupรฉrer mรฉtadonnรฉes
Common Dataโ
'common.countries' // Donnรฉes pays
'common.currencies' // Donnรฉes devises
'common.products' // Donnรฉes produits
'common.animals' // Donnรฉes animaux
'common.transport.modes' // Modes de transport
'common.geo.regions' // Rรฉgions gรฉographiques
๐ Flux de communicationโ
Flux d'authentificationโ
sequenceDiagram
participant C as Client
participant G as Gateway
participant A as Auth Service
participant D as Database
C->>G: POST /auth/login {username, password}
G->>A: nats.send('auth.login', {username, password})
A->>D: SELECT user WHERE username = ?
D-->>A: User data
A->>A: bcrypt.compare(password, hash)
A->>A: Generate JWT token
A-->>G: {token, user, expires_in}
G-->>C: Authentication success
Flux de crรฉation de collectionโ
sequenceDiagram
participant C as Client
participant G as Gateway
participant T as TradeFlow Service
participant A as Auth Service
participant D as Database
C->>G: POST /trade-flow/collections {data}
G->>A: nats.send('auth.validate', {token})
A-->>G: {isValid: true, user: {...}}
G->>T: nats.send('tradeflow.collections.create', {data, user})
T->>D: INSERT INTO collections ...
D-->>T: {id: 123, public_id: 'uuid'}
T-->>G: {success: true, result: collection}
T->>T: nats.emit('tradeflow.collection.created', {collectionId})
G-->>C: Collection created
Flux de validation multi-niveauxโ
sequenceDiagram
participant C as Client
participant G as Gateway
participant T as TradeFlow Service
participant D as Database
C->>G: POST /trade-flow/collections/123/validate
G->>T: nats.send('tradeflow.validation.submit', {collectionId: 123})
T->>D: UPDATE collections SET status = 'pending_validation'
T->>T: nats.emit('tradeflow.validation.queue', {collectionId, level: 1})
T-->>G: {success: true, message: 'Submitted for validation'}
G-->>C: Validation submitted
Note over T: Validation Worker
T->>D: SELECT collection WHERE id = 123
T->>T: Process validation logic
T->>D: UPDATE collections SET status = 'validated'
T->>T: nats.emit('tradeflow.validation.completed', {collectionId})
๐ ๏ธ Configuration NATSโ
Configuration du clientโ
// Configuration dans chaque service
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
const natsClient = ClientProxyFactory.create({
transport: Transport.NATS,
options: {
servers: process.env.NATS_URLS?.split(',') || ['nats://localhost:4222'],
reconnect: true,
maxReconnectAttempts: 10,
reconnectTimeWait: 2000,
timeout: 5000,
maxPayload: 1024 * 1024, // 1MB
name: 'commerce-tracking-service',
user: process.env.NATS_USER,
pass: process.env.NATS_PASS,
},
});
Configuration du serveurโ
# nats-server.conf
port: 4222
http_port: 8222
# Clustering (pour haute disponibilitรฉ)
cluster {
port: 6222
routes: [
"nats://nats-1:6222"
"nats://nats-2:6222"
"nats://nats-3:6222"
]
}
# Persistance
jetstream {
store_dir: "/data/jetstream"
max_memory_store: 2GB
max_file_store: 10GB
}
# Monitoring
monitor_port: 8222
๐ง Gestion des erreursโ
Timeout et retryโ
// Configuration du timeout
const response = await natsClient.send('auth.validate', { token }, {
timeout: 5000 // 5 secondes
});
// Gestion des erreurs avec retry
async function sendWithRetry(subject: string, data: any, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
return await natsClient.send(subject, data, { timeout: 5000 });
} catch (error) {
if (i === maxRetries - 1) throw error;
console.log(`Tentative ${i + 1} รฉchouรฉe, retry dans ${1000 * (i + 1)}ms`);
await new Promise(resolve => setTimeout(resolve, 1000 * (i + 1)));
}
}
}
Circuit breakerโ
class CircuitBreaker {
private failures = 0;
private lastFailureTime = 0;
private readonly threshold = 5;
private readonly timeout = 60000; // 1 minute
async call(subject: string, data: any) {
if (this.isOpen()) {
throw new Error('Circuit breaker is open');
}
try {
const result = await natsClient.send(subject, data);
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private isOpen(): boolean {
return this.failures >= this.threshold &&
(Date.now() - this.lastFailureTime) < this.timeout;
}
private onSuccess() {
this.failures = 0;
}
private onFailure() {
this.failures++;
this.lastFailureTime = Date.now();
}
}
๐ Monitoring et observabilitรฉโ
Mรฉtriques NATSโ
// Mรฉtriques de performance
const metrics = {
messagesSent: 0,
messagesReceived: 0,
errors: 0,
averageLatency: 0,
connections: 0
};
// Middleware de monitoring
natsClient.subscribe('*', (data, subject) => {
metrics.messagesReceived++;
console.log(`Message reรงu sur ${subject}:`, data);
});
Health checksโ
// Health check NATS
async function checkNatsHealth(): Promise<boolean> {
try {
await natsClient.send('health.ping', {}, { timeout: 1000 });
return true;
} catch (error) {
console.error('NATS health check failed:', error);
return false;
}
}
Cette architecture de communication basรฉe sur NATS garantit la scalabilitรฉ, la rรฉsilience et la performance du systรจme Commerce Tracking. Les patterns utilisรฉs permettent une รฉvolution flexible et une maintenance simplifiรฉe.