Version 1.6.1+ | Critical Feature | Production Ready
The Supabase UE5 Plugin implements comprehensive thread safety across all operations to ensure stable, concurrent execution in Unreal Engine's multi-threaded environment. This page details the thread safety architecture, implementation patterns, and best practices.
Thread safety is a critical requirement for production-grade Unreal Engine plugins. The Supabase plugin ensures that all operations can be safely executed from any thread while maintaining data consistency and preventing race conditions.
// Atomic state management in AsyncRealtime
std::atomic<ERealtimeConnectionState> ConnectionState;
mutable FCriticalSection ConnectionStateMutex;
mutable FCriticalSection DelegateMutex;
// Thread-safe state changes
void SetConnectionState(ERealtimeConnectionState NewState)
{
ERealtimeConnectionState OldState = ConnectionState.exchange(NewState);
// Safe broadcasting handled automatically
}
void SafeBroadcastOnGameThread(TFunction<void()> BroadcastFunction)
{
if (IsInGameThread())
{
BroadcastFunction();
}
else
{
TWeakObjectPtr<UAsyncRealtime> WeakThis = WeakSelf;
AsyncTask(ENamedThreads::GameThread, [WeakThis, BroadcastFunction]() {
if (WeakThis.IsValid() && !WeakThis->bIsShuttingDown) {
BroadcastFunction();
}
});
}
}
// Thread-safe cleanup with proper shutdown handling
void SafeCleanup()
{
bIsShuttingDown = true;
// Clear timers safely
if (UWorld* World = GetWorldFromContext())
{
FTimerManager& TimerManager = World->GetTimerManager();
// Clear all timers with validation
}
// Close WebSocket connection safely
if (WebSocket.IsValid())
{
WebSocket->OnConnected().Clear();
WebSocket->OnConnectionError().Clear();
WebSocket->Close();
WebSocket.Reset();
}
}
All async operations follow this thread-safe pattern:
class SUPABASE_API UAsyncOperation : public UBlueprintAsyncActionBase
{
private:
// Thread-safe state tracking
FThreadSafeBool bIsActive;
FThreadSafeBool bIsShuttingDown;
// Weak self-reference for safe lambda captures
TWeakObjectPtr<UAsyncOperation> WeakSelf;
public:
virtual void Activate() override
{
WeakSelf = this;
bIsActive = true;
// Execute operation on appropriate thread
PerformOperation();
}
virtual void BeginDestroy() override
{
SafeCleanup();
Super::BeginDestroy();
}
};
The subsystem ensures thread-safe access to shared resources:
// Thread-safe subsystem pattern
UCLASS()
class SUPABASE_API USupabaseSubsystem : public UGameInstanceSubsystem
{
private:
mutable FCriticalSection ClientMutex;
public:
USupabaseClient* GetClient()
{
FScopeLock Lock(&ClientMutex);
return Client;
}
void SetConnectionState(ESupabaseConnectionState NewState)
{
FScopeLock Lock(&ClientMutex);
ConnectionState = NewState;
// Broadcast state change safely
}
};
Thread-safe connection handling:
// Multiple connections with thread safety
class FSupabaseConnectionPool
{
private:
FCriticalSection PoolMutex;
TArray<TSharedPtr<USupabaseConnection>> AvailableConnections;
public:
TSharedPtr<USupabaseConnection> AcquireConnection()
{
FScopeLock Lock(&PoolMutex);
// Thread-safe connection acquisition
}
void ReleaseConnection(TSharedPtr<USupabaseConnection> Connection)
{
FScopeLock Lock(&PoolMutex);
// Thread-safe connection release
}
};
The plugin includes comprehensive thread safety tests:
class SUPABASETESTS_API FThreadSafetyTest : public FSupabaseTestBase
{
public:
void TestConcurrentAsyncOperations()
{
const int32 ThreadCount = 10;
const int32 OperationsPerThread = 100;
TAtomic<int32> SuccessCount{0};
TAtomic<int32> ErrorCount{0};
// Launch concurrent operations
ParallelFor(ThreadCount, [&](int32 ThreadIndex) {
for (int32 OpIndex = 0; OpIndex < OperationsPerThread; OpIndex++)
{
UAsyncOperation* Op = UAsyncOperation::OperationAsync(TestWorld, TestParams);
Op->OnSuccess.AddLambda([&](const FString& Result) { SuccessCount++; });
Op->OnError.AddLambda([&](const FString& Error) { ErrorCount++; });
}
});
// Validate thread safety
WaitForAsyncCompletion();
TestTrue("Thread safety violation",
(SuccessCount + ErrorCount) == (ThreadCount * OperationsPerThread));
}
};
The plugin minimizes mutex overhead through:
Recommended concurrent operation limits:
| Operation Type | Max Concurrent |
|---|---|
| Query Operations | 50 |
| Realtime Connections | 10 |
| File Uploads | 5 |
| Authentication | 10 |
// DO: Use weak references in lambdas
TWeakObjectPtr<UMyActor> WeakThis = this;
AsyncOp->OnSuccess.AddLambda([WeakThis](const FString& Result) {
if (WeakThis.IsValid()) {
// Safe to use WeakThis
}
});
// DON'T: Capture raw pointers
AsyncOp->OnSuccess.AddLambda([this](const FString& Result) {
// Potentially unsafe if object is destroyed
});
// DO: Use atomic operations for simple state
std::atomic<bool> bIsConnected{false};
// DO: Use mutex for complex state
FCriticalSection StateMutex;
void UpdateComplexState()
{
FScopeLock Lock(&StateMutex);
// Modify complex state safely
}
// DO: Always broadcast on game thread
SafeBroadcastOnGameThread([this]() {
OnSuccess.Broadcast(Result);
});
// DON'T: Direct broadcast from worker threads
OnSuccess.Broadcast(Result); // Potentially unsafe
// Enable detailed thread safety logging
SUPABASE_LOG(LogSupabaseRealtime, Log,
TEXT("Thread ID: %d, Operation: %s"),
FPlatformTLS::GetCurrentThreadId(),
*OperationName);
#if WITH_EDITOR
checkf(IsInGameThread(),
TEXT("Operation must be called from game thread"));
#endif
Use Unreal's built-in profiling tools to monitor:
Symptom: Random crashes during event broadcasting
Solution: Always use SafeBroadcastOnGameThread()
Symptom: Connection state out of sync
Solution: Use atomic operations for state changes
Symptom: Memory usage grows with concurrent operations
Solution: Ensure proper cleanup in BeginDestroy()
Symptom: Lost connections under high load
Solution: Implement proper connection pooling
The realtime subsystem uses a layered approach to thread safety, combining lightweight atomic primitives for simple flags with full mutex protection for complex data structures. This section documents the specific mechanisms used and provides patterns for writing thread-safe event handlers.
FThreadSafeBool wraps std::atomic<bool> with an UE-friendly interface and is used for all boolean flags that may be read or written from any thread. These flags are the first line of defense against race conditions in the async operation lifecycle.
// Used throughout AsyncRealtime for lifecycle management
FThreadSafeBool bIsActive; // True between Activate() and BeginDestroy()
FThreadSafeBool bIsShuttingDown; // True once teardown has started
FThreadSafeBool bIsConnected; // True when WebSocket transport is open
FThreadSafeBool bHasPendingData; // True when queued data awaits processing
// Thread-safe check -- no lock required
if (!bIsShuttingDown && bIsActive)
{
// Safe to proceed; the object is live and operational
}
// Atomic set -- other threads see the change immediately
bIsShuttingDown = true;
Because FThreadSafeBool reads and writes are individually atomic, no FCriticalSection is needed for simple flag checks. However, if a flag must be read and acted upon as part of a larger state transition, wrap the entire block in a mutex (see FCriticalSection below).
Numeric counters that are incremented or decremented from multiple threads use std::atomic<int32> (exposed as TAtomic<int32> in UE) to guarantee that every increment/decrement is observed correctly without a mutex.
// Atomic counters in AsyncRealtime
TAtomic<int32> PendingOperations{0}; // Outstanding async requests
TAtomic<int32> ReconnectAttempts{0}; // Current reconnection attempt number
TAtomic<int64> BytesReceived{0}; // Cumulative bytes received on this connection
// Thread-safe increment -- returns the new value
int32 Outstanding = PendingOperations.Increment();
// Thread-safe decrement
PendingOperations.Decrement();
// Lock-free read
if (BytesReceived.Load() > 0)
{
UE_LOG(LogTemp, Log, TEXT("Received %lld bytes"), BytesReceived.Load());
}
Atomic counters are preferred over mutex-protected counters because they eliminate lock contention on hot paths such as the WebSocket receive loop.
When multiple fields must be read or written together as an atomic unit, the plugin uses FCriticalSection with FScopeLock. This ensures that no other thread can observe a partially-updated state.
// In UAsyncRealtime
mutable FCriticalSection ConnectionStateMutex; // Protects connection state transitions
mutable FCriticalSection DelegateMutex; // Protects delegate lists
mutable FCriticalSection PendingDataMutex; // Protects queued event data
// Example: thread-safe state transition
void UAsyncRealtime::SetConnectionState(ERealtimeConnectionState NewState)
{
FScopeLock Lock(&ConnectionStateMutex);
ERealtimeConnectionState OldState = ConnectionState;
ConnectionState = NewState;
// Under the lock, broadcast the state change on the game thread.
// SafeBroadcastOnGameThread captures the state by value, so the
// lambda does not hold the lock after returning.
SafeBroadcastOnGameThread([this, OldState, NewState]()
{
if (!bIsShuttingDown)
{
OnConnectionStateChanged.Broadcast(NewState);
}
});
}
// Example: thread-safe data queue access
void UAsyncRealtime::EnqueuePendingEvent(const FString& EventPayload)
{
FScopeLock Lock(&PendingDataMutex);
PendingEvents.Add(EventPayload);
bHasPendingData = true;
}
Unreal Engine delegates bound with AddDynamic must fire on the game thread. The plugin marshals all delegate invocations through SafeBroadcastOnGameThread, which checks the current thread and dispatches via AsyncTask(ENamedThreads::GameThread, ...) when necessary.
// Core helper -- used by every delegate broadcast in the plugin
void UAsyncRealtime::SafeBroadcastOnGameThread(TFunction<void()> BroadcastFn)
{
// Fast path: already on the game thread
if (IsInGameThread())
{
BroadcastFn();
return;
}
// Slow path: marshal to the game thread
TWeakObjectPtr<UAsyncRealtime> WeakThis = WeakSelf;
AsyncTask(ENamedThreads::GameThread, [WeakThis, BroadcastFn]()
{
// Re-check validity -- the object may have been GC'd while the
// task was in flight
if (WeakThis.IsValid() && !WeakThis->bIsShuttingDown)
{
BroadcastFn();
}
});
}
Key rules enforced by this pattern:
Broadcast() call goes through SafeBroadcastOnGameThread.TWeakObjectPtr to prevent dangling references.bIsShuttingDown guard prevents broadcasting after teardown begins.The following example demonstrates a complete pattern for handling realtime events safely across threads:
// ---------------------------------------------------------------
// 1. Thread-safe event handler implementation
// ---------------------------------------------------------------
UFUNCTION()
void AGameSessionManager::OnRealtimeData(
const FString& EventType,
const FString& TableName,
const FJsonObjectWrapper& Data)
{
// This handler is guaranteed to run on the game thread because
// SafeBroadcastOnGameThread marshals it there. It is safe to
// access UPROPERTY members, spawn actors, etc.
if (EventType == TEXT("INSERT"))
{
HandleNewPlayerJoined(TableName, Data);
}
else if (EventType == TEXT("UPDATE"))
{
HandlePlayerStateChange(TableName, Data);
}
else if (EventType == TEXT("DELETE"))
{
HandlePlayerLeft(TableName, Data);
}
}
// ---------------------------------------------------------------
// 2. Setting up the subscription with thread-safe bindings
// ---------------------------------------------------------------
void AGameSessionManager::BeginPlay()
{
Super::BeginPlay();
TWeakObjectPtr<AGameSessionManager> WeakThis = this;
UAsyncRealtime* Subscription = UAsyncRealtime::SubscribeToRealtimeAdvanced(
this,
TEXT("game_sessions"),
{TEXT("INSERT"), TEXT("UPDATE"), TEXT("DELETE")},
ESupabaseKey::Service,
30.0f, // HeartbeatInterval
true, // bEnableReconnection
5, // MaxReconnectAttempts
30.0f, // ConnectionTimeout
TEXT("public")
);
// Dynamic bind -- broadcast is marshaled to game thread automatically
Subscription->OnDataReceived.AddDynamic(
this, &AGameSessionManager::OnRealtimeData);
// Lambda bind with weak reference for additional safety
Subscription->OnConnectionStateChanged.AddLambda(
[WeakThis](ERealtimeConnectionState NewState)
{
if (WeakThis.IsValid())
{
WeakThis->HandleConnectionState(NewState);
}
});
Subscription->Activate();
}
// ---------------------------------------------------------------
// 3. Safe shutdown -- prevents callbacks after destruction
// ---------------------------------------------------------------
void AGameSessionManager::EndPlay(const EEndPlayReason::Type EndPlayReason)
{
if (ActiveSubscription)
{
ActiveSubscription->Disconnect();
// bIsShuttingDown is set internally by Disconnect/BeginDestroy,
// which prevents any pending SafeBroadcastOnGameThread lambdas
// from executing.
}
Super::EndPlay(EndPlayReason);
}
Version 1.34.2+ introduces enhanced thread safety:
// Old pattern (unsafe)
AsyncOp->OnSuccess.AddDynamic(this, &AMyActor::OnSuccess);
// New pattern (thread-safe)
TWeakObjectPtr<AMyActor> WeakThis = this;
AsyncOp->OnSuccess.AddLambda([WeakThis](const FString& Result) {
if (WeakThis.IsValid()) {
WeakThis->OnSuccess(Result);
}
});
For thread safety issues or questions:
This documentation is maintained for Supabase UE5 Plugin v1.4.1+. For the latest information, always refer to the current version documentation.