This guide describes how to write a TriplestoreConnector
implementation
for Trippi. Once implemented, you can use your own triplestore as
the back-end for the Trippi console,
the Trippi Query Web Service, and
other applications.
Before setting out, you should understand:
It's easiest to explain this with a concrete example.
Let's assume you're writing a TriplestoreConnector for a new triplestore called MyStore, which is accessed via JDBC and has it's own SQL-style query language, called "MyQL", for doing RDF queries. MyStore supports multiple concurrent connections.
First, you should:
org.trippi.TripleIterator
org.trippi.TupleIterator
org.trippi.impl.base.TriplestoreSession
org.trippi.impl.base.TriplestoreSessionFactory
,
which is a simple provider of your kinds of sessions. It should use a single DBCP connection pool (
see here for
sample code to construct a DBCP DataSource)
to construct new MyTriplestoreSession instances.
org.trippi.TriplestoreConnector
.
This class is will be responsible for returning a single TriplestoreWriter instance whenenver
getReader or getWriter is called. This instance should be an instance of
org.trippi.impl.base.ConcurrentTriplestoreWriter
.
At this point, you should be able to construct your own instances of all the classes necessary to
construct one of these (see the constructor, and also the source code of org.trippi.impl.kowari.KowariConnector
to see what is needed and how it can be done).Partial implementations of the first several classes you need to write follow. If you're using this code as a template to write a JDBC-backed implementation, pay particular attention to text in bold -- it's what you'll need to change/implement in your particular situation.
This class will be used to support your implementation of TriplestoreSession.findTriples(SubjectNode, PredicateNode, ObjectNode). Once written, you should be able to test this class by giving it an actual JDBC ResultSet/Statement pair from a query to your underlying triplestore that returns a subject, predicate, and object for each result row.
package org.example.mystore.trippi; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; // from jrdf jar import org.jrdf.graph.Triple; // from trippi jar import org.trippi.TripleIterator; import org.trippi.TrippiException; public class MyTripleIterator extends TripleIterator { private ResultSet m_resultSet; private Statement m_statement; private Connection m_connection; private Triple m_nextTriple; private boolean m_closed; public MyTripleIterator(ResultSet resultSet, Statement statement, Connection connection) throws TrippiException { m_resultSet = resultSet; m_statement = statement; m_connection = connection; m_nextTriple = getNextTriple(); } private Triple getNextTriple() throws TrippiException { try { if (m_resultSet.next()) { return getTripleFromCurrentRow(); } else { return null; // signals no more results } } catch (Throwable th) { throw new TrippiException("Error moving to next " + "ResultSet row", th); } } private Triple getTripleFromCurrentRow() throws TrippiException { try { // // TODO: Convert the subject, predicate, and object in the current // m_resultSet row into an org.jrdf.graph.Triple and return it // throw new Exception("Not implemented"); } catch (Throwable th) { throw new TrippiException("Error getting triple from current " + "ResultSet row", th); } } public boolean hasNext() throws TrippiException { return (m_nextTriple != null); } public Triple next() throws TrippiException { if (m_nextTriple == null) { return null; } else { Triple thisTriple = m_nextTriple; m_nextTriple = getNextTriple(); return thisTriple; } } public void close() throws TrippiException { if (!m_closed) { try { m_resultSet.close(); m_statement.close(); m_connection.close(); m_closed = true; } catch (Throwable th) { throw new TrippiException("Error closing MyTripleIterator", th); } } } }
This class will be used to support your implementation of TriplestoreSession.query(String, String) It can be tested much like MyTripleIterator.java. In this case, the JDBC ResultSet is expected to contain an arbitrary number of bound variables.
package org.example.mystore.trippi; import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Statement; import java.util.Map; // from trippi jar import org.trippi.TrippiException; import org.trippi.TupleIterator; public class MyTupleIterator extends TupleIterator { private ResultSet m_resultSet; private Statement m_statement; private Connection m_connection; private String[] m_columnNames; private Map m_nextTuple; private boolean m_closed; public MyTupleIterator(ResultSet resultSet, Statement statement, Connection connection) throws TrippiException { m_resultSet = resultSet; m_statement = statement; m_connection = connection; m_columnNames = getColumnNames(); m_nextTuple = getNextTuple(); } private String[] getColumnNames() throws TrippiException { try { ResultSetMetaData md = m_resultSet.getMetaData(); String[] names = new String[md.getColumnCount()]; for (int i = 0; i < names.length; i++) { names[i] = md.getColumnName(i); } return names; } catch (Throwable th) { throw new TrippiException("Error getting ResultSet metadata", th); } } public String[] names() { return m_columnNames; } private Map getNextTuple() throws TrippiException { try { if (m_resultSet.next()) { return getTupleFromCurrentRow(); } else { return null; // signals no more results } } catch (Throwable th) { throw new TrippiException("Error moving to next ResultSet row", th); } } private Map getTupleFromCurrentRow() throws TrippiException { try { // // TODO: Convert all named values in the current m_resultSet row // into JRDF Node objects, put these in a Map keyed by name, // and return it. // throw new Exception("Not implemented"); } catch (Throwable th) { throw new TrippiException("Error getting tuple from current " + "ResultSet row", th); } } public boolean hasNext() throws TrippiException { return (m_nextTuple != null); } public Map next() throws TrippiException { if (m_nextTuple == null) { return null; } else { Map thisTuple = m_nextTuple; m_nextTuple = getNextTuple(); return thisTuple; } } public void close() throws TrippiException { if (!m_closed) { try { m_resultSet.close(); m_statement.close(); m_connection.close(); m_closed = true; } catch (Throwable th) { throw new TrippiException("Error closing MyTupleIterator", th); } } } }
This uses the above classes to help implement the TriplestoreSession interface.
package org.example.mystore.trippi; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; import java.util.Iterator; import java.util.Set; // from commons-dbcp jar import org.apache.commons.dbcp.BasicDataSource; // from jrdf jar import org.jrdf.graph.ObjectNode; import org.jrdf.graph.PredicateNode; import org.jrdf.graph.SubjectNode; import org.jrdf.graph.Triple; // from trippi jar import org.trippi.TripleIterator; import org.trippi.TrippiException; import org.trippi.TupleIterator; import org.trippi.impl.base.TriplestoreSession; public class MyTriplestoreSession implements TriplestoreSession { private static final String _SPO = "spo"; private static final String _MYQL = "myql"; private static final String[] _TRIPLE_LANGS = new String[] { _SPO }; private static final String[] _TUPLE_LANGS = new String[] { _MYQL }; private BasicDataSource m_dbPool; public MyTriplestoreSession(BasicDataSource dbPool) { m_dbPool = dbPool; } public String[] listTripleLanguages() { return _TRIPLE_LANGS; } public String[] listTupleLanguages() { return _TUPLE_LANGS; } // Find triples in a language other than SPO (SPO is parsed at a higher // level, then passed to the other findTriples method) public TripleIterator findTriples(String lang, String queryText) throws TrippiException { throw new TrippiException("Unsupported triple query language: " + lang); } // This is used to support the required SPO query language public TripleIterator findTriples(SubjectNode subject, PredicateNode predicate, ObjectNode object) throws TrippiException { Connection conn = null; Statement stmt = null; ResultSet rset = null; try { conn = m_dbPool.getConnection(); stmt = conn.createStatement(); rset = stmt.executeQuery(getTripleQuery(subject, predicate, object)); return new MyTripleIterator(rset, stmt, conn); } catch (Throwable th) { if (rset != null) { try { rset.close(); } catch (Exception e) { } } if (stmt != null) { try { stmt.close(); } catch (Exception e) { } } if (conn != null) { try { conn.close(); } catch (Exception e) { } } throw new TrippiException("Error querying for triple pattern", th); } } private String getTripleQuery(SubjectNode subject, PredicateNode predicate, ObjectNode object) { // // TODO: Convert the given JRDF nodes to a database query that returns // three values per row. Note that the values for this method // may be given as null, which means "any". // return "SELECT subject, predicate, object FROM ... etc, etc"; } public TupleIterator query(String query, String lang) throws TrippiException { if (lang != _MYQL) { throw new TrippiException("Unsupported tuple query language: " + lang); } Connection conn = null; Statement stmt = null; ResultSet rset = null; try { conn = m_dbPool.getConnection(); stmt = conn.createStatement(); rset = stmt.executeQuery(replaceAliasesInMYQLQuery(query)); return new MyTupleIterator(rset, stmt, conn); } catch (Throwable th) { if (rset != null) { try { rset.close(); } catch (Exception e) { } } if (stmt != null) { try { stmt.close(); } catch (Exception e) { } } if (conn != null) { try { conn.close(); } catch (Exception e) { } } throw new TrippiException("Error querying for tuples", th); } } private String replaceAliasesInMYQLQuery(String myqlQuery) { // // TODO: Trippi supports the notion of persistent aliases that can // be used across multiple queries (as opposed temporal aliases // that are specified in each query). // // If these are not supported by the underlying triplestore, // substitutions to the query text (or a special part of the query // that specifies per-query aliases) can be made here, before it // is sent to the server. // // If they are supported by the underlying triplestore, setting // them to be recognized by the server would be done at a higher // level than this. For instance, in the implementation of the // TriplestoreReader's setAliasMap method. // return myqlQuery; } public void add(Set triples) throws TrippiException { doTriples(triples, false); } public void delete(Set triples) throws TrippiException { doTriples(triples, true); } private void doTriples(Set triples, boolean delete) throws TrippiException { Connection conn = null; Statement stmt = null; boolean startedTransaction = false; boolean committedTransaction = false; try { conn = m_dbPool.getConnection(); conn.setAutoCommit(false); startedTransaction = true; stmt = conn.createStatement(); Iterator iter = triples.iterator(); while (iter.hasNext()) { Triple triple = (Triple) iter.next(); String updateString; if (delete) { updateString = getDeleteString(triple); } else { updateString = getInsertString(triple); } stmt.executeUpdate(updateString); } conn.commit(); committedTransaction = true; } catch (Throwable th) { if (startedTransaction) { try { conn.rollback(); } catch (Exception e) { } } String action; if (delete) { action = "deleting"; } else { action = "adding"; } throw new TrippiException("Error " + action + " triples", th); } finally { if (startedTransaction) { try { conn.setAutoCommit(true); } catch (Exception e) { } } if (stmt != null) { try { stmt.close(); } catch (Exception e) { } } if (conn != null) { try { conn.close(); } catch (Exception e) { } } } } private String getInsertString(Triple triple) { // // TODO: Get the appropriate SQL for adding the given JRDF Triple // return "INSERT INTO xyz, etc, etc"; } private String getDeleteString(Triple triple) { // // TODO: Get the appropriate SQL for deleting the given JRDF Triple // return "DELETE FROM xyz, etc, etc"; } public void close() throws TrippiException { // Do nothing -- a higher level class needs to close the underlying db // pool because it may be used by multiple TriplestoreSession objects } }
Using the commons-dbcp package from Apache, this code shows how to construct a BasicDataSource that can be used as a pool of database connections.
import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbcp.BasicDataSourceFactory; ... String dbDriverClassName = "org.mystore.jdbc.Driver"; // dbProperties are used to configure the pool and contain things from // http://jakarta.apache.org/commons/dbcp/configuration.html Properties dbProperties = new Properties(); // connectionProperties are connection-specific properties // and aren't required, but if they are needed, are defined by the // underlying database vendor Properties connectionProperties = new Properties(); BasicDataSource pool; try { Class.forName(dbDriverClassName); pool = (BasicDataSource) BasicDataSourceFactory .createDataSource(dbProperties); pool.setDriverClassName(dbDriverClassName); Enumeration e = connectionProperties.propertyNames(); while (e.hasMoreElements()) { String name = (String) e.nextElement(); pool.addConnectionProperty(name, (String) connectionProperties.getProperty(name)); } } catch (Exception e) { throw new TrippiException("Unable to initialize DataSource", e); }