view runtime/transaction.c @ 139:a68e6828d896

Global stores and transactions are working. Definately leaks memory on retries. Probably a fair number of bugs to work out. However, a basic test program works.
author Mike Pavone <pavone@retrodev.com>
date Fri, 19 Nov 2010 04:04:14 -0500
parents
children ba35ab624ec2
line wrap: on
line source

#include "transaction.h"
#include <stdarg.h>
#include <string.h>
#include <stdlib.h>

rh_mutex(trans_lock)

trans_cell * find_obj_cell(transaction * trans, mutable_object * to_find)
{
	int32_t idx;
	while(trans)
	{
		for (idx = 0; idx < trans->num_cells; ++idx)
			if (trans->cells[idx].obj = to_find)
				return &(trans->cells[idx]);
		trans = trans->chain;
	}
	return NULL;
}

void begin_transaction(context * ct, int numobjs,...)
{
	mutable_object *obj;
	transaction *parent;
	va_list args;
	int32_t idx,got_global_lock=0;
	
	parent = ct->transaction ? ct->transaction : NULL;
	
	ct->transaction = malloc(sizeof(transaction)+((numobjs-1)*sizeof(trans_cell)));
	ct->transaction->parent = parent;
	ct->transaction->chain = NULL;
	rh_mutex_init(ct->lock);
	ct->transaction->num_cells = numobjs;
	va_start(args, numobjs);
	if (parent)
	{
		rh_lock(parent->lock);
			for (idx = 0; idx < numobjs; ++idx)
			{
				obj = va_arg(args, mutable_object *);
				ct->transaction->cells[idx].obj = obj;
				ct->transaction->cells[idx].parent = find_obj_cell(parent, obj);
				if (ct->transaction->cells[idx].parent)
				{
					ct->transaction->cells[idx].local_data = ct->transaction->cells[idx].parent->local_data;
					ct->transaction->cells[idx].orig_version = ct->transaction->cells[idx].parent->local_version;
				} else {
					if (!got_global_lock)
					{
						rh_lock(trans_lock);
						got_global_lock = 1;
					}
					ct->transaction->cells[idx].local_data = obj->data;
					ct->transaction->cells[idx].orig_version = obj->version;
				}
				ct->transaction->cells[idx].local_version = 0;
			}
			if (got_global_lock)
			{
				rh_unlock(trans_lock);
			}
		rh_unlock(parent->lock);
	} else {
		rh_lock(trans_lock);
			for (idx = 0; idx < numobjs; ++idx)
			{
				obj = va_arg(args, mutable_object *);
				ct->transaction->cells[idx].obj = obj;
				ct->transaction->cells[idx].parent = NULL;
				ct->transaction->cells[idx].local_data = add_ref(obj->data);
				ct->transaction->cells[idx].orig_version = obj->version;
				ct->transaction->cells[idx].local_version = 0;
			}
		rh_unlock(trans_lock);
	}
}

void free_trans(transaction * trans)
{
	if (trans)
	{
		free_trans(trans->chain);
		free(trans);
	}
}

int32_t commit_transaction(context * ct, int32_t readonly)
{
	transaction *tmp_trans, *current;
	object * tmp_obj;
	int32_t idx,numaddparent;

	if (ct->transaction->parent)
	{
		rh_lock(ct->transaction->parent->lock);
			current = ct->transaction;
			while(current)
			{
				for (idx = 0; idx < current->num_cells; ++idx)
				{
					if (current->cells[idx].parent)
					{
						if (current->cells[idx].parent->local_version != current->cells[idx].orig_version)
						{
							rh_unlock(ct->transaction->parent->lock);
							return 0;
						}
					} else {
						if(find_obj_cell(ct->transaction->parent->chain, current->cells[idx].obj))
						{
							rh_unlock(ct->transaction->parent->lock);
							return 0;
						} else
							numaddparent++;
					}
				}
				current = current->chain;
			}
			if (numaddparent)
			{
				tmp_trans = malloc(sizeof(transaction)+(numaddparent - 1)*sizeof(trans_cell));
				tmp_trans->chain = ct->transaction->parent->chain;
				tmp_trans->num_cells = 0;
				ct->transaction->parent->chain = tmp_trans;
			}
			current = ct->transaction;
			while(current)
			{
				for (idx = 0; idx < ct->transaction->num_cells; ++idx)
				{
					if (ct->transaction->cells[idx].parent)
					{
						//Only commit a particular object if a change has been made
						if (ct->transaction->cells[idx].local_version)
						{
							tmp_obj = ct->transaction->cells[idx].parent->local_data;
							ct->transaction->cells[idx].parent->local_data = ct->transaction->cells[idx].local_data;
							release_ref(tmp_obj);
							ct->transaction->cells[idx].parent->local_version++;
						} else {
							release_ref(ct->transaction->cells[idx].local_data);
						}
					} else {
						memcpy(&(tmp_trans->cells[tmp_trans->num_cells++]), &(ct->transaction->cells[idx]), sizeof(trans_cell));
					}
				}
				current = current->chain;
			}
		rh_unlock(ct->transaction->parent->lock);
	} else {
		if(readonly)
		{
			for (idx = 0; idx < ct->transaction->num_cells; ++idx)
			{
				release_ref(ct->transaction->cells[idx].local_data);
			}
		} else {
			rh_lock(trans_lock);
				current = ct->transaction;
				while(current)
				{
					for (idx = 0; idx < current->num_cells; ++idx)
					{
						if (current->cells[idx].obj->version != current->cells[idx].orig_version)
						{
							rh_unlock(trans_lock);
							return 0;
						}
					}
					current = current->chain;
				}
				current = ct->transaction;
				while(current)
				{
					for (idx = 0; idx < current->num_cells; ++idx)
					{
						//Only commit a particular object if a change has been made
						if (current->cells[idx].local_version)
						{
							tmp_obj = current->cells[idx].obj->data;
							current->cells[idx].obj->data = current->cells[idx].local_data;
							release_ref(tmp_obj);
							current->cells[idx].obj->version++;
						} else {
							release_ref(current->cells[idx].local_data);
						}
					}
					current = current->chain;
				}
			rh_unlock(trans_lock);
		}
	}
	rh_mutex_del(ct->transaction->lock);
	tmp_trans = ct->transaction->parent;
	free_trans(ct->transaction);
	ct->transaction = tmp_trans;
	return 1;
}

void prep_retry(context * ct)
{
	transaction * current;
	int32_t idx,got_global_lock=0;
	if (ct->transaction->parent)
	{
		rh_lock(ct->transaction->parent->lock);
			current = ct->transaction;
			while(current)
			{
				for (idx = 0; idx < current->num_cells; ++idx)
				{
					release_ref(current->cells[idx].local_data);
					current->cells[idx].local_version = 0;
					if (!current->cells[idx].parent)
						current->cells[idx].parent = find_obj_cell(ct->transaction->parent, current->cells[idx].obj);
					if (current->cells[idx].parent)
					{
						current->cells[idx].local_data = current->cells[idx].parent->local_data;
						current->cells[idx].orig_version = current->cells[idx].parent->local_version;
					} else {
						if (!got_global_lock)
						{
							rh_lock(trans_lock);
							got_global_lock = 1;
						}
						current->cells[idx].local_data = current->cells[idx].obj->data;
						current->cells[idx].orig_version = current->cells[idx].obj->version;
					}
				}
				current = current->chain;
			}
			if (got_global_lock)
			{
				rh_unlock(trans_lock);
			}
		rh_unlock(ct->transaction->parent->lock);
	} else {
		rh_lock(trans_lock);
			current = ct->transaction;
			while(current)
			{
				for (idx = 0; idx < current->num_cells; ++idx)
				{
					release_ref(current->cells[idx].local_data);
					current->cells[idx].local_version = 0;
					current->cells[idx].local_data = current->cells[idx].obj->data;
					current->cells[idx].orig_version = current->cells[idx].obj->version;
				}
				current = current->chain;
			}
		rh_unlock(trans_lock);
	}
}