Implementing synchronization for multiple threads in .NET is easy. There are a lot of options for doing that – for example, using the Monitor class. But, what if we need to synchronize execution in a Distributed System? One possible answer is using RavenDB!
How can RavenDB help us to synchronize execution in a distributed system?
RavenDB 4 supports Distributed Compare-Exchange Operations (I strongly recommend you to read this article for a better understanding).
RavenDB uses a consensus protocol to manage much of its distributed state. The consensus is used to ensure consistency in a distributed system and it is open for users as well. You can use this feature to enable some interesting scenarios. (Oren Eini)
Trying to make the idea simple. RavenDB provides you a key/value interface where you can make distributed atomic operations on, knowing that they are entirely consistent.
Show me the code!
Here we go!
public class DistributedLockObject { public DateTime? ExpiresAt { get; set; } } public class RavenDbProcessMonitor : IProcessMonitor { private readonly IDocumentStore _documentStore; public RavenDbProcessMonitor(IDocumentStore documentStore) { _documentStore = documentStore; } public long Enter(string lockKey) { while (true) { var now = DateTime.UtcNow; var lockObject = new DistributedLockObject { ExpiresAt = DateTime.UtcNow + TimeSpan.FromMinutes(5) }; var result = _documentStore.Operations.Send( new PutCompareExchangeValueOperation<DistributedLockObject>( lockKey, lockObject, 0) ); if (result.Successful) { // resourceName wasn't present - we managed to reserve return result.Index; } if (result.Value.ExpiresAt < now) { // Time expired - Update the existing key with the new value var takeLockWithTimeoutResult = _documentStore.Operations.Send( new PutCompareExchangeValueOperation<DistributedLockObject>(lockKey, lockObject, result.Index)); if (takeLockWithTimeoutResult.Successful) { return takeLockWithTimeoutResult.Index; } } // Wait a little bit and retry Thread.Sleep(20); } } public void Exit(string lockKey, long lockIndex) { _documentStore.Operations.Send( new DeleteCompareExchangeValueOperation<DistributedLockObject>( lockKey, lockIndex) ); } public IDisposable Lock(string lockKey) => new ProcessLock(this, lockKey); }
The previous code is an adaptation of the example available in the official documentation.
The client code specifies a “lock key” trying to acquire a lock. If this key is already in use, the code waits until it is available to use again.
RavenDB ensures that only one client will be able to acquire the lock, even in a cluster.
Here is a little usage example:
public static class Program { public static void Main() { var store = new DocumentStore { Database = "TheFlow", Urls = new[] {"http://localhost:8080"} }; store.Initialize(); var monitor = new RavenDbProcessMonitor(store); while (true) { Console.WriteLine("Press ENTER to lock.."); Console.ReadLine(); Console.WriteLine("Waiting for the lock..."); using (monitor.Lock("sample/resource")) { Console.WriteLine("Locked.."); Console.WriteLine("Press ENTER to release.."); Console.ReadLine(); } Console.WriteLine("Released..."); } // ReSharper disable once FunctionNeverReturns } }
To test it, just start multiple instances of this code.
Hi Elemar,
Is the CompareExchangeValueOperation a good approach to generating a sequential number that needs to be restarted at the beginning of the day?
Is there any other operation better than cmpXchg for that issue?
[]’s