Table of Contents |
---|
...
Introduction
The Engine defines primitive operations to concurrently access/modify data while maintaining consistency. Since it is desirable to build more complex routines from these operations (while also maintaining consistency in concurrent environments), we must build support for atomic operations at the Engine level, which ConcourseServer can use to safely implement complex routines.
Atomic Operation Support in the Engine
- AtomicOperation is a BufferedStore that transports to the Engine (or a Transaction)
- AtomicOperation uses just in time locking so it never block other operations before it goes to commit
- When committing, AtomicOperation grabs locks to create a fence around the resources on which it will operate. It does not release these locks until it is done committing
- AtomicOperation listens for version changes to the resources that it intends to lock. If it is notified about a version change it fails immediately.
Note |
---|
|
Atomic Functions in Concourse Server
All built-in atomic functions must be defined at the Server level and can only mutate using the engine recognized primitive operations defined in AtomicOperation.
Link
- How do we determine that a link is valid?
- Ping the record? This will tell us if the record currently has any data. What about records with no current data, but some historical data, should we be able to link to those?
- Pinging is not a primitive operation
Revert
Revert key IN record TO timestamp
Code Block |
---|
void revert(String key, long record, long timestamp){ AtomicOperation operation = null; while (operation == null || !operation.commit()) { operation = doRevert(key, record, timestamp, transaction != null ? transactions.get(transaction) : engine); } } AtomicOperation doRevert(String key, long record, long timestamp, Compoundable store) { AtomicOperation operation = AtomicOperation.start(store); Set<TObject> past = operation.fetch(key, record, timestamp); Set<TObject> present = operation.fetch(key, record); Set<TObject> xor = Sets.symmetricDifference(past, present); try { for (TObject value : xor) { if(present.contains(value)) { operation.remove(key, value, record); } else { operation.add(key, value, record); } } return operation; } catch (AtomicStateException e) |
...
{
return null;
}
|
Clear
CLEAR key IN record
Code Block |
---|
boolean clear(String key, long record){
AtomicOperation operation = null;
while(operation == null || !operation.commit()){
operation = doClear(key, record, transaction != null ? transactions.get(transaction) : engine);
}
return true;
}
AtomicOperation doClear(String key, long record, Store store){
AtomicOperation operation = AtomicOperation.start(store);
AtomicOperation operation = AtomicOperation.start(store);
Set<TObject> values = operation.fetch(key, record);
for (TObject value : values){
operation.remove(key, value, record);
}
return operation;
} |
Set
SET key AS value IN record
Code Block |
---|
//NOTE: CANNOT use the clear() method but must do removes in terms of the atomic operation!!
boolean set(String key, TObject value, long record){
AtomicOperation operation = null;
while(operation == null || !operation.commit()){
operation = doSet(key, value, record, transaction != null ? transactions.get(transaction) : engine);
}
return true;
}
AtomicOperation doSet(String key, TObject value, long record, Store store){
AtomicOperation operation = AtomicOperation.start(store);
Set<TObject> values = operation.fetch(key, record);
for (TObject v : values){
operation.remove(key, v, record);
}
operation.add(key, value, record);
return operation;
} |
Verify And Swap
See http://en.wikipedia.org/wiki/Compare-and-swap
Verify key AS value1 IN record AND SET AS SWAP WITH value2
Code Block |
---|
boolean verifyAndSet//NOTE: cannot do automatic retry for this method, so caller must handle that boolean verifyAndSwap(String key, TObject expected, long record, TObject replacement){ AtomicOperation operation = AtomicOperation.start(store); if(operation.verify(key, expected, record){ operation.remove(key, expected, record); operation.add(key, replacement, record); return operation.commit(); } else{ return false; } } |
Get And
...
Increment
See http://en.wikipedia.org/wiki/Fetch-and-add
FETCH GET key AS value in record AND INCREMENT
...
Number getAndIncrement(String key,long record) |
Warning |
---|
The first value for key in record (get returns the first value of a fetch) must be a Number, otherwise this method should throw an exception. TODO: how to handle floats and doubles? |
Code Block |
---|
Number getAndIncrement(String key,long record){
while(true){
value = get(key, record);
if(verifyAndSwap(key, value, record, value + 1){
return value;
}
}
} |