Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YCSB+T: Added transactions to DB and DBWrapper and validate() to Workloa... #169

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 96 additions & 61 deletions core/src/main/java/com/yahoo/ycsb/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,57 +309,76 @@ public void run()
//NOTE: Switching to using nanoTime and parkNanos for time management here such that the measurements
// and the client thread have the same view on time.

//spread the thread operations out so they don't all hit the DB at the same time
// GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0
// and the sleep() doesn't make sense for granularities < 1 ms anyway
if ((_targetOpsPerMs > 0) && (_targetOpsPerMs <= 1.0))
{
long randomMinorDelay = Utils.random().nextInt((int) _targetOpsTickNs);
sleepUntil(System.nanoTime() + randomMinorDelay);
}
try
{
if (_dotransactions)
{
long startTimeNanos = System.nanoTime();

while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested())
{

if (!_workload.doTransaction(_db,_workloadstate))
{
break;
}

_opsdone++;

throttleNanos(startTimeNanos);
}
}
else
{
long startTimeNanos = System.nanoTime();

while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested())
{

if (!_workload.doInsert(_db,_workloadstate))
{
break;
}

_opsdone++;

throttleNanos(startTimeNanos);
}
}
}
catch (Exception e)
{
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
//spread the thread operations out so they don't all hit the DB at the same time
try
{
//GH issue 4 - throws exception if _target>1 because random.nextInt argument must be >0
//and the sleep() doesn't make sense for granularities < 1 ms anyway
if ( (_targetOpsPerMs>0) && (_targetOpsPerMs<=1.0) )
{
sleep(Utils.random().nextInt((int)(1.0/_targetOpsTickNs)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks unrelated to the change mentioned in the PR description, could you leave it out?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry about the format changes. I could merge it again if that helps.

}
}
catch (InterruptedException e)
{
// do nothing.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably wrong. we should at a minimum set the interrupted status of the thread.

}

try
{
if (_dotransactions)
{
long startTimeNanos = System.nanoTime();

while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested())
{

try {
_db.start();
if (_workload.doTransaction(_db,_workloadstate)) {
_db.commit();
} else {
_db.abort();
}
} catch (DBException e) {
throw new WorkloadException(e);
}

_opsdone++;

throttleNanos(startTimeNanos);
}
}
else
{
long startTimeNanos = System.nanoTime();

while (((_opcount == 0) || (_opsdone < _opcount)) && !_workload.isStopRequested())
{

try {
_db.start();
if (_workload.doInsert(_db,_workloadstate)) {
_db.commit();
} else {
_db.abort();
}
} catch (DBException e) {
throw new WorkloadException(e);
}

_opsdone++;

throttleNanos(startTimeNanos);
}
}
}
catch (Exception e)
{
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}

try
{
Expand Down Expand Up @@ -930,16 +949,32 @@ public void run()
}
}

try
{
workload.cleanup();
}
catch (WorkloadException e)
{
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}
try {
DB db = DBFactory.newDB(dbname,props);
db.init();
if (workload.validate(db))
System.out.println("Database validation succeeded");
else
System.out.println("Database validation failed");
} catch (WorkloadException e) {
System.out.println("Database validation failed with error: " + e.getMessage());
} catch (UnknownDBException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (DBException e) {
e.printStackTrace();
}

try
{
workload.cleanup();
}
catch (WorkloadException e)
{
e.printStackTrace();
e.printStackTrace(System.out);
System.exit(0);
}

try
{
Expand Down
21 changes: 21 additions & 0 deletions core/src/main/java/com/yahoo/ycsb/DB.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,27 @@ public void cleanup() throws DBException
{
}

/**
* Start a database transaction.
*/
public void start() throws DBException
{
}

/**
* Commit the current database transaction.
*/
public void commit() throws DBException
{
}

/**
* Abort the current database transaction.
*/
public void abort() throws DBException
{
}

/**
* Read a record from the database. Each field/value pair from the result will be stored in a HashMap.
*
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/java/com/yahoo/ycsb/DBWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,52 @@ public void cleanup() throws DBException
measure("CLEANUP", Status.OK, ist, st, en);
}

public void start() throws DBException
{
long st=System.nanoTime();
try {
_db.start();
long en = System.nanoTime();
_measurements.measure("START", (int) ((en - st) / 1000));
_measurements.reportStatus("START", Status.OK);
} catch (DBException e) {
long en=System.nanoTime();
_measurements.measure("START",(int)((en-st)/1000));
_measurements.reportStatus("START", Status.ERROR);
throw e;
}
}

public void commit() throws DBException
{
long st=System.nanoTime();
try {
_db.commit();
long en = System.nanoTime();
_measurements.measure("COMMIT", (int) ((en - st) / 1000));
_measurements.reportStatus("COMMIT", Status.OK);
} catch (DBException e) {
long en = System.nanoTime();
_measurements.measure("ABORT", (int) ((en - st) / 1000));
_measurements.reportStatus("ABORT", Status.ERROR);
}
}

public void abort() throws DBException
{
long st=System.nanoTime();
try {
_db.abort();
long en = System.nanoTime();
_measurements.measure("ABORT", (int) ((en - st) / 1000));
_measurements.reportStatus("ABORT", Status.OK);
} catch (DBException e) {
long en=System.nanoTime();
_measurements.measure("ABORT",(int)((en-st)/1000));
_measurements.reportStatus("ABORT", Status.ERROR);
}
}

/**
* Read a record from the database. Each field/value pair from the result
* will be stored in a HashMap.
Expand Down
19 changes: 16 additions & 3 deletions core/src/main/java/com/yahoo/ycsb/Workload.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ public void cleanup() throws WorkloadException
* other, and it will be difficult to reach the target throughput. Ideally, this function would have no side
* effects other than DB operations and mutations on threadstate. Mutations to threadstate do not need to be
* synchronized, since each thread has its own threadstate instance.
* @throws WorkloadException
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what purpose does the exception serve?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says whether the workload operation (mostly for validate()) was successful or not. This is necessary since we now have the ability to determine whether a database operation was successful or not within the context of the transaction.

*/
public abstract boolean doInsert(DB db, Object threadstate);
public abstract boolean doInsert(DB db, Object threadstate) throws WorkloadException;

/**
* Do one transaction operation. Because it will be called concurrently from multiple client threads, this
Expand All @@ -91,9 +92,11 @@ public void cleanup() throws WorkloadException
* synchronized, since each thread has its own threadstate instance.
*
* @return false if the workload knows it is done for this thread. Client will terminate the thread. Return true otherwise. Return true for workloads that rely on operationcount. For workloads that read traces from a file, return true when there are more to do, false when you are done.
* @throws WorkloadException
*/
public abstract boolean doTransaction(DB db, Object threadstate);

public abstract boolean doTransaction(DB db, Object threadstate) throws WorkloadException;


/**
* Allows scheduling a request to stop the workload.
*/
Expand All @@ -109,4 +112,14 @@ public boolean isStopRequested() {
if (stopRequested.get() == true) return true;
else return false;
}

/**
* Perform validation of the database db after the workload has executed.
*
* @return false if the workload left the database in an inconsistent state, true if it is consistent.
* @throws WorkloadException
*/
public boolean validate(DB db) throws WorkloadException {
return true;
}
}
Loading