Version 1.34.2 | Production Ready | Unreal Engine 5.5+
Channel Subscriptions in the Supabase UE5 Plugin provide real-time database change notifications through WebSocket connections. This system allows your Unreal Engine application to receive instant updates when data in your Supabase database changes, enabling reactive user interfaces and real-time multiplayer features.
The Channel Subscription system is built around the UAsyncRealtime class, which manages WebSocket connections to Supabase’s realtime service. It provides thread-safe operations, automatic reconnection handling, and comprehensive error recovery mechanisms.
The system tracks several connection states:
| State | Description |
|---|---|
Disconnected |
No active connection |
Connecting |
Establishing WebSocket connection |
Connected |
WebSocket connected, preparing to join channel |
Joining Channel |
Sending join request to realtime channel |
Subscribed |
Successfully subscribed and receiving events |
Error |
Connection or subscription error occurred |
Reconnecting |
Attempting to reconnect after failure |
// C++ - Basic realtime subscription
UAsyncRealtime* RealtimeNode = UAsyncRealtime::SubscribeToRealtime(
this,
Connection,
TEXT("messages"),
ESupabaseKey::Authenticated
);
// Bind event handlers
RealtimeNode->OnSuccess.AddDynamic(this, &AMyClass::OnSubscribeSuccess);
RealtimeNode->OnFailure.AddDynamic(this, &AMyClass::OnSubscribeFailure);
RealtimeNode->OnInsert.AddDynamic(this, &AMyClass::OnMessageInserted);
RealtimeNode->OnUpdate.AddDynamic(this, &AMyClass::OnMessageUpdated);
RealtimeNode->OnDelete.AddDynamic(this, &AMyClass::OnMessageDeleted);
// Activate the subscription
RealtimeNode->Activate();
// Advanced subscription with custom configuration
TArray<FString> EventTypes = {TEXT("INSERT"), TEXT("UPDATE")};
UAsyncRealtime* AdvancedRealtime = UAsyncRealtime::SubscribeToRealtimeAdvanced(
this,
Connection,
TEXT("user_profiles"), // Table name
EventTypes, // Specific event types
ESupabaseKey::Authenticated, // Key type
30.0f, // Heartbeat interval (seconds)
true, // Enable reconnection
5, // Max reconnect attempts
30.0f, // Connection timeout
TEXT("public") // Database schema
);
The system broadcasts three main event types:
Triggered when new records are added to the subscribed table.
UFUNCTION()
void AMyClass::OnMessageInserted(const FString& EventType, const FString& Payload)
{
UE_LOG(LogTemp, Log, TEXT("New message inserted: %s"), *Payload);
// Parse the JSON payload to extract the new record data
TSharedPtr<FJsonObject> JsonObject;
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Payload);
if (FJsonSerializer::Deserialize(Reader, JsonObject))
{
// Access the new record data
TSharedPtr<FJsonObject> NewRecord = JsonObject->GetObjectField(TEXT("new"));
// Process the new data...
}
}
Triggered when existing records are modified.
UFUNCTION()
void AMyClass::OnMessageUpdated(const FString& EventType, const FString& Payload)
{
// Parse payload to get both old and new values
TSharedPtr<FJsonObject> JsonObject;
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Payload);
if (FJsonSerializer::Deserialize(Reader, JsonObject))
{
TSharedPtr<FJsonObject> OldRecord = JsonObject->GetObjectField(TEXT("old"));
TSharedPtr<FJsonObject> NewRecord = JsonObject->GetObjectField(TEXT("new"));
// Compare old vs new values...
}
}
Triggered when records are removed from the table.
UFUNCTION()
void AMyClass::OnMessageDeleted(const FString& EventType, const FString& Payload)
{
// Parse payload to get the deleted record information
TSharedPtr<FJsonObject> JsonObject;
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Payload);
if (FJsonSerializer::Deserialize(Reader, JsonObject))
{
TSharedPtr<FJsonObject> OldRecord = JsonObject->GetObjectField(TEXT("old"));
// Process the deleted record...
}
}
Monitor connection health with state change events:
// Bind to connection state changes
RealtimeNode->OnConnectionStateChanged.AddDynamic(this, &AMyClass::OnConnectionStateChanged);
UFUNCTION()
void AMyClass::OnConnectionStateChanged(ERealtimeConnectionState NewState)
{
switch (NewState)
{
case ERealtimeConnectionState::Subscribed:
UE_LOG(LogTemp, Log, TEXT("Successfully subscribed to realtime updates"));
break;
case ERealtimeConnectionState::Error:
UE_LOG(LogTemp, Error, TEXT("Realtime connection error"));
break;
case ERealtimeConnectionState::Reconnecting:
UE_LOG(LogTemp, Warning, TEXT("Attempting to reconnect..."));
break;
}
}
// Check connection state
ERealtimeConnectionState CurrentState = RealtimeNode->GetConnectionState();
// Check if connection is healthy
bool bIsHealthy = RealtimeNode->IsConnectionHealthy();
// Manual reconnection
RealtimeNode->Reconnect();
// Disconnect
RealtimeNode->Disconnect();
The system includes robust automatic reconnection with exponential backoff:
When subscribing to a channel, the system sends a Phoenix protocol join message:
{
"topic": "realtime:public:messages",
"event": "phx_join",
"payload": {
"config": {
"postgres_changes": [
{
"event": "*",
"schema": "public",
"table": "messages"
}
],
"presence": {},
"broadcast": {}
}
},
"ref": "1"
}
Database change events arrive with this structure:
{
"topic": "realtime:public:messages",
"event": "postgres_changes",
"payload": {
"eventType": "INSERT",
"new": {
"id": 123,
"content": "Hello World",
"created_at": "2024-01-01T12:00:00Z"
},
"old": null,
"table": "messages",
"schema": "public"
}
}
UFUNCTION()
void AMyClass::OnSubscribeFailure(const FString& ErrorMessage)
{
UE_LOG(LogTemp, Error, TEXT("Subscription failed: %s"), *ErrorMessage);
// Implement your error recovery logic
if (ErrorMessage.Contains(TEXT("authentication")))
{
// Refresh authentication token
RefreshAuthToken();
}
else if (ErrorMessage.Contains(TEXT("network")))
{
// Wait and retry
RetrySubscriptionAfterDelay();
}
}
All operations are thread-safe and use appropriate synchronization:
Subscription Not Receiving Events
Connection Keeps Dropping
Events Arrive Out of Order
Enable detailed logging for troubleshooting:
// Enable verbose logging for realtime operations
UE_LOG(LogSupabaseRealtime, VeryVerbose, TEXT("Debug message"));
The system provides comprehensive logging at different verbosity levels:
Ensure proper RLS policies are configured:
-- Example RLS policy for user-specific data
CREATE POLICY "Users can only see their own messages" ON messages
FOR SELECT USING (auth.uid() = user_id);
CREATE POLICY "Users can only insert their own messages" ON messages
FOR INSERT WITH CHECK (auth.uid() = user_id);
Next Steps: