org.osesb.scheduledjobs.databasepoller
Class DefaultDatabasePollerScheduledJob
java.lang.Object
org.osesb.scheduledjobs.ScheduledJob
org.osesb.scheduledjobs.databasepoller.DefaultDatabasePollerScheduledJob
- All Implemented Interfaces:
- java.lang.Runnable, org.quartz.Job, org.quartz.StatefulJob
public class DefaultDatabasePollerScheduledJob
- extends ScheduledJob
DefaultDatabasePollerScheduledJob is the default database poller class.
DefaultDatabasePollerScheduledJob extends org.osesb.scheduledjobs.ScheduledJob.
ScheduledJob(s) can be configured as Quartz-based Jobs or as Runnable(s).
ScheduledJob(s) implement the org.quartz.StatefulJob and Runnable
interfaces. ScheduledJob(s) include a method named executeJob(),
which descendents override to do real work.
When configured as a Quartz Job, org.quartz.Job#execute() invokes
executeJob(). When configured as a Runnable job, run() invokes
executeJob().
DefaultDatabasePollerScheduledJob#executeJob() method does the following:
- Uses a descendant of org.osesb.connectors.jdbc.TableSet to process
database updates. DefaultDatabasePollerScheduledJob uses
SelectAndCheckpointProcessedRowsTableSetAdapter, which scans database tables for
changes.
- Sends a message to an osESB transport handler using a
org.osesb.msgrouting.ClientGateway.
- Uses SelectAndCheckpointProcessedRowsTableSetAdapter to checkpoint processed
rows so that rows are not processed more than once.
A TableSet is a configurable connector for interacting with relational databases.
TableSet(s) are included in the the org.osesb.connectors.jdbc package.
A TableSet uses a TableSetAdapter to accomplish specific interactions, such as
selecting rows from a database, updating rows in a database, or converting data in
database rows to XML. A TableSet exposes two methods:
- processTableSet(), which processes configured SQLStatement(s) and returns
an Object[] as the result of the database interaction.
- processStatement(), which processes a named, configured SQLStatement(s) and returns
an Object[] as the result of the database interaction.
For example, a TableSet that is used to update a database might return an Integer
with a value that is equal to the number of rows that were updated while a
TableSet that executes a query might return a java.sql.ResultSet or a
com.sun.rowset.CachedRowSetImpl.
Database poller scheduled jobs expect a <job-configuration>
ScheduledJobConfiguration element that contains a DatabasePollerConfiguration,
which includes a org.osesb.connectors.jdbc.TableSetConfiguration and a
org.osesb.msgrouting.configuration.ClientGatewayConfiguration.
For Quartz-based database pollers, databasePollerConfiguration is set by execute().
For Runnable ScheduledJob(s), databasePollerConfiguration is set by the constructor
that takes a ScheduledJobConfiguration as the parameter.
The TableSetConfiguration is used for processing database changes.
A TableSetConfiguration includes:
- SQLStatementListConfiguration, which contains a List of named,
org.osesb.utilities.jdbc.SQLStatement(s).
- A JDBCConfiguration for establishing database connections. Database
connections can be configured as DataSource(s) or as plain old JBDC connections.
The JDBCConfiguration should be configured to use the appropriate IConnectionManager.
- A TableSetAdapterConfiguration for a TableSetAdapter class that
performs TableSet processing.
The following TableSetAdapter classes are provided out of the box:
- SelectAndCheckpointProcessedRowsTableSetAdapter, which scans database tables
for changes and checkpoints processed rows by updating one or more columns
in the target table(s). Typically, SelectAndCheckpointProcessedRowsTableSetAdapter
configures a scrollable, updatable ResultSet where the
org.osesb.utilities.jdbc.SQLStatements are configured with
org.osesb.utilities.jdbc.SQLInputParameters and where the values of the
SQLInputParameters are given by a scripted variable. Scripted variables
are supported by the org.osesb.scriptedvariables package.
- SelectAndDeleteProcessedRowsTableSetAdapter extends SelectStatementTableSetAdapter, which
accomplishes the table scan, and has a checkpointProcessedRows() method that is invoked
to delete processed rows.
The ClientGatewayConfiguration configures a org.osesb.msgrouting.configuration.ClientGateway,
which is used to send messages to osESB. Typically, the ClientGateway
configures a org.osesb.messagepipeline.MessagePipeline with
org.osesb.messagepipeline.PipelineStage(s) that:
- Create a message formatted for osESB processing. Generally, this
is a XML message, possibly with attachments.
- Send the message to an osESB transport handler using SOAP or JMS.
In addition, DatabasePollerConfiguration supports two configuration properties
that limit the number of un-checkpointed rows processed by executeJob():
- The 'maxNumberOfItemsToProcess' property sets an upper limit on
the number of items processed by executeJob(). For example, if
this property is set to 5 and if there are 20 un-checkpointed
items when executeJob() is invoked, executeJob() will process,
at most, 5 items.
- The 'percentItemsToProcess' determines the percent of un-checkpointed
items processed by executeJob(). For example, if this property
is set to 10 and if there are 100 un-checkpointed items when
executeJob() is invoked, executeJob() will process, at most,
10 items. The number of items processed will be less than
'maxNumberOfItemsToProcess'.
It is difficult to design a set of database poller classes that can meet the
variety of requirements that one is likely to encounter. For example, suppose that
database rows are to be checkpointed using a timestamp column and a messageID column
where the messageID column is updated with the messageID of the message that is sent
to osESB via the configured ClientGateway. Further suppose that the database
ResultSet(s) are transformed to XML and that one message per row is sent to osESB.
In these circumstances, the built-in MessagePipeline stages that are in the
org.osesb.messagepipeline.builtinstages package need to be extended for the ClientGateway
that is configured by the database poller to send messages to osESB.
Because of the complexities in real-world database poller implementations, it was decided
to provide a default database poller class, DefaultDatabasePollerScheduledJob, that
uses a SelectAndCheckpointProcessedRowsTableSetAdapter and then provide examples
of other configurations in the org.osesb.scheduledjobs.databasepoller.test package.
- Since:
- Version .9
|
Field Summary |
protected DatabasePollerConfiguration |
databasePollerConfiguration
The configured TableSet for this DefaultDatabasePollerScheduledJob,
which is constructed using the DatabasePollerConfiguration. |
static java.lang.String |
JOBDATA_MAP_KEY_NUMBEROFROWSPROCESSED_JOBRUN
In addition to standard statistics tracked by ScheduledJob,
DefaultDatabasePollerScheduledJob tracks the number of rows
that were processed for the last time the job was run. |
static java.lang.String |
JOBDATA_MAP_KEY_NUMBEROFROWSPROCESSED_SINCEJOBSTARTED
In addition to standard statistics tracked by ScheduledJob,
DefaultDatabasePollerScheduledJob tracks the number of rows
that were processed since the job was started. |
protected TableSet |
tableSet
The configured TableSet for this DefaultDatabasePollerScheduledJob,
which is constructed using the DatabasePollerConfiguration. |
| Fields inherited from class org.osesb.scheduledjobs.ScheduledJob |
JOBDATA_MAP_KEY_JOBNAME, JOBDATA_MAP_KEY_JOBSTARTEDTIMESTAMP, JOBDATA_MAP_KEY_JOBTYPE, JOBDATA_MAP_KEY_LASTHEARTBEATTIMESTAMP, JOBDATA_MAP_KEY_LASTJOBTIMESTAMP, JOBDATA_MAP_KEY_NUMBEROFTIMESEXECUTEJOB, jobExecutionContext, jobPropertyMap, jobThread, jobThreadStopping, jobType, log, QUARTZ_JOB_TYPE, RUNNABLE_JOB_TYPE, scheduledJobConfiguration, thisClassName |
|
Method Summary |
void |
execute(org.quartz.JobExecutionContext context)
org.quartz.Job interface. |
void |
executeJob()
Execute the ScheduledJob according to the ScheduledJobConfiguration. |
void |
incrementJobStatistics()
Increment job statistics. |
boolean |
logHeartbeatMessage()
Log the heartbeat message if configured by the
<heartbeatInterval> property in the
ScheduledJobConfiguration. |
static void |
main(java.lang.String[] args)
main()
For testing. |
void |
stop()
Stop a Runnable DefaultDatabasePollerScheduledJob. |
| Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
tableSet
protected TableSet tableSet
- The configured TableSet for this DefaultDatabasePollerScheduledJob,
which is constructed using the DatabasePollerConfiguration.
ScheduledJob(s) can be configured as Quartz Jobs or as Runnable(s).
For Quartz ScheduledJob(s), tableSet is constructed by execute().
For Runnable ScheduledJob(s), tableSet is constructed by the constructor
that takes a ScheduledJobConfiguration as the parameter.
databasePollerConfiguration
protected DatabasePollerConfiguration databasePollerConfiguration
- The configured TableSet for this DefaultDatabasePollerScheduledJob,
which is constructed using the DatabasePollerConfiguration.
ScheduledJob(s) can be configured as Quartz Jobs or as Runnable(s).
For Quartz ScheduledJob(s), databasePollerConfiguration is set by execute().
For Runnable ScheduledJob(s), databasePollerConfiguration is set by the constructor
that takes a ScheduledJobConfiguration as the parameter.
JOBDATA_MAP_KEY_NUMBEROFROWSPROCESSED_JOBRUN
public static final java.lang.String JOBDATA_MAP_KEY_NUMBEROFROWSPROCESSED_JOBRUN
- In addition to standard statistics tracked by ScheduledJob,
DefaultDatabasePollerScheduledJob tracks the number of rows
that were processed for the last time the job was run.
This is the job property key for the number of rows
that were processed for the last time the job was run.
- See Also:
incrementJobStatistics(),
logHeartbeatMessage(),
Constant Field Values
JOBDATA_MAP_KEY_NUMBEROFROWSPROCESSED_SINCEJOBSTARTED
public static final java.lang.String JOBDATA_MAP_KEY_NUMBEROFROWSPROCESSED_SINCEJOBSTARTED
- In addition to standard statistics tracked by ScheduledJob,
DefaultDatabasePollerScheduledJob tracks the number of rows
that were processed since the job was started.
This is the job property key for the number of rows
that were processed since the job was started.
- See Also:
incrementJobStatistics(),
logHeartbeatMessage(),
Constant Field Values
DefaultDatabasePollerScheduledJob
public DefaultDatabasePollerScheduledJob()
- Creates a new, uninitalized instance of DefaultDatabasePollerScheduledJob.
ScheduledJob(s) can be configured as Quartz Jobs or as Runnable(s).
This is the no-argument constructor used by the Quartz scheduler.
DefaultDatabasePollerScheduledJob
public DefaultDatabasePollerScheduledJob(ScheduledJobConfiguration scheduledJobConfiguration)
throws java.lang.Exception
- Creates a new, uninitalized instance of ScheduledJob.
ScheduledJob(s) can be configured as Quartz Jobs or as Runnable(s).
This is the constructor used to construct a Runnable ScheduledJob.
- Parameters:
scheduledJobConfiguration - The ScheduledJobConfiguration for this ScheduledJob.
- Throws:
java.lang.Exception - if constructed with a null scheduledJobConfiguration.
execute
public void execute(org.quartz.JobExecutionContext context)
throws org.quartz.JobExecutionException
- org.quartz.Job interface.
DefaultDatabasePollerScheduledJob extends ScheduledJob with an execute()
method that:
- execute() expects a ScheduledJobConfiguration to be passed as an entry in the
org.quartz.JobDataMap using ScheduledJobManager.JOBDATA_MAP_KEY_SCHEDULEDJOBCONFIGURATION
as the key. The
<job-configuration> ScheduledJobConfiguration element
must contain a DatabasePollerConfiguration, which includes a
org.osesb.connectors.jdbc.TableSetConfiguration and, optionally, a
org.osesb.msgrouting.configuration.ClientGatewayConfiguration.
- execute() expects a SelectAndCheckpointProcessedRowsTableSetAdapter to be configured by
the TableSetConfiguration.
- execute() processes the configured SQL select statement using a
SelectAndCheckpointProcessedRowsTableSetAdapter and returns the result
as an Object[] where the first element of the array is a java.sql.ResultSet.
By default SelectAndCheckpointProcessedRowsTableSetAdapter#processTableSet()
looks for a SQL statement named 'selectStatement' in the SQLStatementListConfiguration
portion of the TableSetConfiguration file. To specify a SQLStatement with a
different name, configure an adapter property named 'selectStatementName'.
- If DefaultDatabasePollerScheduledJob is configured with a
org.osesb.msgrouting.ClientGateway, execute() constructs a
EAIObjectMessage with the RowSet result of the SQL select statement
and invokes ClientGateway#sendEAIMessage().
- Typically, the ClientGateway is configured with a MessagePipeline
that transforms the RowSet to XML and sends a XML message to an
osESB transport handler over JMS or SOAP. See the
org.osesb.scheduledjobs.databasepoller.test package for example
configurations of a database poller scheduled job that accomplish this.
- After sending a message through the Gateway, execute() invokes
SelectAndCheckpointProcessedRowsTableSetAdapter#checkpointProcessedRows(),
which checkpoints processed rows.
- Finally, execute() releases JDBC resources.
- Specified by:
execute in interface org.quartz.Job- Overrides:
execute in class ScheduledJob
- Parameters:
context - The Quartz JobExecutionContext.
- Throws:
org.quartz.JobExecutionException - if there is a problem completing the execute()
method.- See Also:
Job.execute(org.quartz.JobExecutionContext)
executeJob
public void executeJob()
throws java.lang.Exception
- Execute the ScheduledJob according to the ScheduledJobConfiguration.
ScheduledJob(s) can be configured as Quartz Jobs or as Runnable(s).
When configured as a Quartz Job, org.quartz.Job#execute() invokes
executeJob(). When configured as a Runnable job, run() invokes
executeJob().
DefaultDatabasePollerScheduledJob extends ScheduledJob with an
executeJob() method that:
- executeJob() processes the configured SQL select statement using a
SelectAndCheckpointProcessedRowsTableSetAdapter and returns the result
as an Object[] where the first element of the array is a java.sql.ResultSet.
By default SelectAndCheckpointProcessedRowsTableSetAdapter#processTableSet()
looks for a SQL statement named 'selectStatement' in the SQLStatementListConfiguration
portion of the TableSetConfiguration file. To specify a SQLStatement with a
different name, configure an adapter property named 'selectStatementName'.
- If DefaultDatabasePollerScheduledJob is configured with a
org.osesb.msgrouting.ClientGateway, executeJob() constructs a
EAIObjectMessage with the RowSet result of the SQL select statement
and invokes ClientGateway#sendEAIMessage().
- Typically, the ClientGateway is configured with a MessagePipeline
that transforms the RowSet to XML and sends a XML message to an
osESB transport handler over JMS or SOAP. See the
org.osesb.scheduledjobs.databasepoller.test package for example
configurations of a database poller scheduled job that accomplish this.
- After sending a message through the Gateway, executeJob() invokes
SelectAndCheckpointProcessedRowsTableSetAdapter#checkpointProcessedRows(),
which checkpoints processed rows.
- Finally, executeJob() releases JDBC resources.
- Overrides:
executeJob in class ScheduledJob
- Throws:
java.lang.Exception - org.quartz.JobExecutionException
if there is a problem completing the executeJob()
method.
stop
public void stop()
- Stop a Runnable DefaultDatabasePollerScheduledJob.
ScheduledJob(s) can be configured as Quartz Jobs or as Runnable(s).
When configured as a Runnable job, invoke stop() to stop the job.
- Overrides:
stop in class ScheduledJob
incrementJobStatistics
public void incrementJobStatistics()
- Increment job statistics.
- Overrides:
incrementJobStatistics in class ScheduledJob
- See Also:
logHeartbeatMessage()
logHeartbeatMessage
public boolean logHeartbeatMessage()
- Log the heartbeat message if configured by the
<heartbeatInterval> property in the
ScheduledJobConfiguration.
- Overrides:
logHeartbeatMessage in class ScheduledJob
- Returns:
- true if the heartbeat message was logged, false otherwise.
A heartbeat message is logged if the
<heartbeatInterval>
is configured and if the configured <heartbeatInterval> has
elapsed.
main
public static void main(java.lang.String[] args)
- main()
For testing.
- Parameters:
args - commandline arguments