Version 1.34.2 | Production Ready | Thread-Safe Implementation
The Supabase UE5 Plugin provides a comprehensive event system for handling real-time database changes, authentication events, and general operations. The event system is designed with thread safety, memory management, and Blueprint integration as core principles.
The plugin automatically handles PostgreSQL database changes through WebSocket connections:
// C++ Example - Subscribing to database changes
UAsyncRealtime* RealtimeTask = UAsyncRealtime::AsyncRealtime(
this,
TEXT("users"), // Table name
TEXT("public"), // Schema (optional)
TArray<FString>() // Event types (empty = all)
);
// Bind to specific change events
RealtimeTask->OnInsert.AddDynamic(this, &AMyActor::HandleInsert);
RealtimeTask->OnUpdate.AddDynamic(this, &AMyActor::HandleUpdate);
RealtimeTask->OnDelete.AddDynamic(this, &AMyActor::HandleDelete);
RealtimeTask->Activate();
| Event Type | Description | Payload Content |
|---|---|---|
INSERT |
New row added | Complete new row data |
UPDATE |
Existing row modified | Old and new row data |
DELETE |
Row removed | Deleted row identifier |
All database change events receive structured JSON payloads:
{
"eventType": "INSERT|UPDATE|DELETE",
"schema": "public",
"table": "users",
"columns": [...],
"commit_timestamp": "2024-01-01T00:00:00Z",
"errors": null
}
The recommended approach uses the USupabaseSubsystem for centralized event management:
// Get subsystem reference
USupabaseSubsystem* Subsystem = USupabaseManager::GetSupabaseSubsystem(this);
// Bind to authentication events
Subsystem->OnLoginSuccessful.AddDynamic(this, &AMyActor::HandleLoginSuccess);
Subsystem->OnLoginFailed.AddDynamic(this, &AMyActor::HandleLoginFailure);
// Bind to operation events
Subsystem->OnQueryCompleted.AddDynamic(this, &AMyActor::HandleQueryResult);
Individual async operations provide their own event delegates:
UAsyncQuery* QueryTask = UAsyncQuery::AsyncQuery(this, TEXT("users"), TEXT("*"));
QueryTask->OnSuccess.AddDynamic(this, &AMyActor::OnQuerySuccess);
QueryTask->OnFailure.AddDynamic(this, &AMyActor::OnQueryFailure);
All events are automatically marshaled to the game thread using SafeBroadcastOnGameThread:
// Internal implementation (reference only)
SafeBroadcastOnGameThread([this, EventType, PayloadString]() {
if (bIsShuttingDown || !IsValid(this)) {
return;
}
FScopeLock Lock(&DelegateMutex);
if (EventType == TEXT("INSERT")) {
OnInsert.Broadcast(EventType, PayloadString);
}
// ... other event types
});
Real-time connections maintain thread-safe state tracking:
// Connection states
enum class ERealtimeConnectionState : uint8
{
Disconnected,
Connecting,
Connected,
Subscribed,
Error
};
All event handlers are automatically cleaned up to prevent memory leaks:
void UAsyncRealtime::SafeCleanup()
{
// Stop heartbeat timer
if (HeartbeatTimerHandle.IsValid())
{
GetWorld()->GetTimerManager().ClearTimer(HeartbeatTimerHandle);
}
// Close WebSocket connection
if (WebSocket.IsValid())
{
WebSocket->OnConnected().RemoveAll(this);
WebSocket->OnConnectionError().RemoveAll(this);
WebSocket->OnClosed().RemoveAll(this);
WebSocket->OnMessage().RemoveAll(this);
WebSocket->Close();
WebSocket.Reset();
}
// Clear references
Connection = nullptr;
WorldContextObject = nullptr;
}
Always unbind events in cleanup methods:
void UAsyncLogin::UnbindEvents()
{
if (bUseSubsystem)
{
USupabaseSubsystem* Subsystem = USupabaseManager::GetSupabaseSubsystem(WorldContextObject);
if (Subsystem)
{
Subsystem->OnLoginSuccessful.RemoveDynamic(this, &UAsyncLogin::HandleLoginSuccess);
Subsystem->OnLoginFailed.RemoveDynamic(this, &UAsyncLogin::HandleLoginFailure);
}
}
}
Use the Async Realtime node in Blueprint:
Create Blueprint functions to handle events:
// Blueprint function signature examples
UFUNCTION(BlueprintImplementableEvent)
void HandleUserInsert(const FString& EventType, const FString& PayloadData);
UFUNCTION(BlueprintImplementableEvent)
void HandleUserUpdate(const FString& EventType, const FString& PayloadData);
The system automatically handles connection errors with retry logic:
// Automatic reconnection with exponential backoff
void UAsyncRealtime::HandleConnectionError(const FString& Error)
{
UE_LOG(LogSupabaseRealtime, Warning, TEXT("WebSocket connection error: %s"), *Error);
SetConnectionState(ERealtimeConnectionState::Error);
SafeBroadcastOnGameThread([this, Error]() {
OnFailure.Broadcast(FString::Printf(TEXT("WebSocket error: %s"), *Error));
SetReadyToDestroy();
});
}
All event handlers include comprehensive validation:
void UAsyncRealtime::HandleMessage(const FString& Message)
{
if (bIsShuttingDown)
{
return;
}
// Parse and validate JSON
TSharedPtr<FJsonObject> JsonObject;
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(Message);
if (!FJsonSerializer::Deserialize(Reader, JsonObject) || !JsonObject.IsValid())
{
UE_LOG(LogSupabaseRealtime, Error, TEXT("Failed to parse JSON message: %s"), *Message);
return;
}
// Safe field access with validation
if (!JsonObject->HasField(TEXT("event")))
{
UE_LOG(LogSupabaseRealtime, Warning, TEXT("Message missing event field: %s"), *Message);
return;
}
// Continue processing...
}
Built-in protection against event flooding:
// Heartbeat rate limiting
static constexpr float MIN_HEARTBEAT_INTERVAL = 5.0f;
void UAsyncRealtime::SendHeartbeat()
{
float CurrentTime = FPlatformTime::Seconds();
if (CurrentTime - LastHeartbeatTime < MIN_HEARTBEAT_INTERVAL)
{
return; // Skip if too frequent
}
LastHeartbeatTime = CurrentTime;
// Send heartbeat...
}
// Preferred approach
USupabaseSubsystem* Subsystem = USupabaseManager::GetSupabaseSubsystem(this);
// Use subsystem events for centralized management
virtual void BeginDestroy() override
{
SafeCleanup();
Super::BeginDestroy();
}
void HandleRealtimeSuccess()
{
// Connection established successfully
UE_LOG(LogTemp, Log, TEXT("Real-time connection active"));
}
void HandleRealtimeFailure(const FString& Error)
{
// Handle connection failure gracefully
UE_LOG(LogTemp, Warning, TEXT("Real-time connection failed: %s"), *Error);
}
void HandleDataChange(const FString& EventType, const FString& PayloadData)
{
// Always validate payload before processing
TSharedPtr<FJsonObject> JsonObject;
TSharedRef<TJsonReader<>> Reader = TJsonReaderFactory<>::Create(PayloadData);
if (FJsonSerializer::Deserialize(Reader, JsonObject) && JsonObject.IsValid())
{
// Process validated data
}
else
{
UE_LOG(LogTemp, Error, TEXT("Invalid event payload received"));
}
}
Events Not Firing
Memory Leaks
SafeCleanup() in BeginDestroy()Thread Safety Errors
SafeBroadcastOnGameThread for async eventsEnable detailed logging for event debugging:
// In your game's logging configuration
LogSupabaseRealtime = VeryVerbose
LogTemp = Log
This documentation is maintained for Supabase UE5 Plugin v1.34.2. For the latest updates, visit our GitHub repository.