...
The Engine defines primitive operations to concurrently access/modify data while maintaining consistency. Since it is desirable to build more complex routines from these operations (while also maintaining consistency in concurrent environments), we must build an AtomicOperation protocol support for atomic operations at the Engine level, which ConcourseServer can use to safely provide implement complex routines to the client.
...
Atomic Operation Support in the Engine
- AtomicOperation is a BufferedStore that transports to the Engine (or a Transaction)
- AtomicOperation uses just in time locking so it never block other operations before it goes to commit
- When committing, AtomicOperation grabs locks to create a fence around the resources on which it will operate. It does not release these locks until it is done committing
- AtomicOperation listens for version changes to the resources that it intends to lock. If it is notified about a version change it fails immediately.
Note |
---|
TODO: may have to move durability up from Transaction to AtomicOperation |
...
|
Atomic Functions in Concourse Server
All built-in atomic functions must be defined at the Server level and can only mutate using the engine recognized primitive operations defined in AtomicOperation.
LinkLink
- How do we determine that a link is valid?
- Ping the record? This will tell us if the record currently has any data. What about records with no current data, but some historical data, should we be able to link to those?
- Pinging is not a primitive operation
RevertRevert
Revert key IN record TO timestamp
Code Block |
---|
void revert(String key, long record, long timestamp){ AtomicOperation operation = null; while (operation == null || !operation.commit()) { operation = doRevert(key, record, timestamp, transaction != null ? transactions.get(transaction) : engine); } } AtomicOperation doRevert(String key, long record, long timestamp, Compoundable store) { AtomicOperation operation = AtomicOperation.start(store); Set<TObject> past = operation.fetch(key, record, timestamp); Set<TObject> present = operation.fetch(key, record); Set<TObject> xor = Sets.symmetricDifference(past, present); try { for (TObject value : xor) { if(present.contains(value)) { operation.remove(key, value, record); } else { operation.add(key, value, record); } } return operation; } catch (AtomicStateException e) { return null; } |
ClearClear
CLEAR key IN record
Code Block |
---|
boolean clear(String key, long record){ AtomicOperation operation = null; while(operation == null || !operation.commit()){ operation = doClear(key, record, transaction != null ? transactions.get(transaction) : engine); } return true; } AtomicOperation doClear(String key, long record, Store store){ AtomicOperation operation = AtomicOperation.start(store); AtomicOperation operation = AtomicOperation.start(store); Set<TObject> values = operation.fetch(key, record); for (TObject value : values){ operation.remove(key, value, record); } return operation; } |
SetSet
SET key AS value IN record
Code Block |
---|
//NOTE: CANNOT use the clear() method but must do removes in terms of the atomic operation!! boolean set(String key, TObject value, long record){ AtomicOperation operation = null; while(operation == null || !operation.commit()){ operation = doSet(key, value, record, transaction != null ? transactions.get(transaction) : engine); } return true; } AtomicOperation doSet(String key, TObject value, long record, Store store){ AtomicOperation operation = AtomicOperation.start(store); Set<TObject> values = operation.fetch(key, record); for (TObject v : values){ operation.remove(key, v, record); } operation.add(key, value, record); return operation; } |
Verify And SwapSwap
See http://en.wikipedia.org/wiki/Compare-and-swap
...
Code Block |
---|
//NOTE: cannot do automatic retry for this method, so caller must handle that boolean verifyAndSwap(String key, TObject expected, long record, TObject replacement){ AtomicOperation operation = AtomicOperation.start(store); if(operation.verify(key, expected, record){ operation.remove(key, expected, record); operation.add(key, replacement, record); return operation.commit(); } else{ return false; } } |
Get And IncrementIncrement
See http://en.wikipedia.org/wiki/Fetch-and-add
...