9 Migrating Data to a Sharded Database

You can migrate data from a non-sharded database to an Oracle Sharding sharded database using the methods described here.

These data loading methods assume that you are using a non-sharded Oracle database at the time you want to migrate to a sharded one. These methods proposed also apply to migrating data from other database systems, as well as the first time database users.

9.1 About Migrating Data to a Sharded Database

After Oracle Sharding software installation, and sharded database configuration and creation, you can migrate your data to a sharded database.

The following are the high level steps to migrate to a sharded database environment.

  1. Design and create the sharded database schema.
  2. Migrate the data.
  3. Migrate the application.

See Also:

Application Suitability for Sharding to familiarize yourself with the constraints of migration to a sharded database applications.

9.2 General Guidelines for Loading Data into a Sharded Database

Transitioning from non-sharded to a sharded database involves moving the data from non-sharded tables to sharded and duplicated tables. Moving data from non-sharded tables to duplicated tables does not introduce any complexity, but moving data from non-sharded tables to sharded tables requires special attention.

Loading Data into Duplicated Tables

Loading data into a duplicated table can be accomplished using any existing database tools: Data Pump, SQL Loader, or plain SQL. The data must be loaded using the shard catalog (coordinator) database node. In other words, the entire contents of the duplicated table is contained in the shard catalog database. Because the contents of the duplicated table is fully replicated to the database shards using materialized views, loading a duplicated table may take longer than loading the same data into a non-sharded table.

Figure 9-1 Loading Duplicated Tables



Loading Data into Sharded Tables

When loading a sharded table, each database shard accommodates a distinct subset (shard) of the entire data set, so you must split (shuffle) the data before loading each subset to a particular shard.

Figure 9-2 Loading Sharded Tables Using the Shard Catalog



You must use the Oracle Data Pump utility to load the data across database shards in subsets. Consider the following two options:

  • Load the data through the sharding coordinator (catalog) node, as illustrated above.
  • Load the data directly to the database shards, as illustrated below.

Figure 9-3 Loading Sharded Tables Directly to the Database Shards



Loading the data into a sharded database using the sharding coordinator is slower than loading the entire data set into a non-sharded table, because of the splitting logic running on the sharding coordinator (catalog) node and additional overhead of pushing the data to the shards.

Loading the data directly into the database shards is much faster, because each shard is loaded separately. That is, by running Data Pump on each shard, you can complete the data loading operation within the period of time needed to load the shard with the maximum subset of the entire data set. On average, the loading time can be approximated as the time needed to load the entire data set into a non-sharded database, divided by the number of shards in the sharded database.

Rather than relying on the Oracle Data Pump utility to split the load data set in distinct subsets, you can use an open source shard splitting library that integrates the splitting (shuffling) logic into your application. The shard splitting library source code, as well as sample use, is available in the Oracle Sharding Tools Library at https://github.com/oracle/db-sharding/. Based on this shard splitting library, Oracle develops a generic streaming load library for the use in the Oracle Cloud. At this time the streaming load library is only available upon request.

9.3 Migrating the Schema

Before the existing database can be migrated to the sharded database, you must decide how to organize the sharded database, including the number of shards and the replication strategy, and you must decide which tables in the application are sharded and which tables are duplicated tables. For the sharded tables, you must decide the sharding method as well as the parent-child relationships between the sharded tables in the table family.

The schema migration to a sharded database environment is illustrated using a sample application, which is defined over a data model and imposed constraints. We analyze how migration to a sharded database affects the application using sample program code. The following figure shows the sample application data model

The data model comprises four tables: Customers, Orders, StockItems, and LineItems, and the model enforces the following primary key constraints.

  • Customer.(CustNo)

  • Orders.(PONo)

  • StockItems.(StockNo)

  • LineItems.(LineNo, PONo)

The data model defines the following referential integrity constrains.

  • Customers.CustNo -> Orders.CustNo

  • Orders.PONo -> LineItems.PONo

  • StockItems.StockNo -> LineItems.StockNo

The following DDL statements create the sample application database schema definitions:

CREATE TABLE Customers (
 CustNo     NUMBER(3) NOT NULL,
 CusName    VARCHAR2(30) NOT NULL,
 Street     VARCHAR2(20) NOT NULL,
 City       VARCHAR2(20) NOT NULL,
 State      CHAR(2) NOT NULL,
 Zip        VARCHAR2(10) NOT NULL,
 Phone      VARCHAR2(12),
 PRIMARY KEY (CustNo)
);

CREATE TABLE Orders (
 PoNo       NUMBER(5),
 CustNo     NUMBER(3) REFERENCES Customers,
 OrderDate  DATE,
 ShipDate   DATE,
 ToStreet   VARCHAR2(20),
 ToCity     VARCHAR2(20),
 ToState    CHAR(2),
 ToZip      VARCHAR2(10),
 PRIMARY KEY (PoNo)
);

CREATE TABLE StockItems (
 StockNo     NUMBER(4) PRIMARY KEY
 Description VARCHAR2(20),
 Price       NUMBER(6,2)
);

CREATE TABLE LineItems (
 LineNo      NUMBER(2),
 PoNo        NUMBER(5) REFERENCES Orders,
 StockNo     NUMBER(4) REFERENCES StockItems,
 Quantity    NUMBER(2),
 Discount    NUMBER(4,2),
 PRIMARY KEY (LineNo, PoNo)
);

Sharding Key

Sharding is a database scaling technique based on horizontal partitioning across multiple independent Oracle databases. The database requests are routed to appropriate shard database based on the value of sharding key column. The sharding design goal is to select a sharding key which maximizes single shard operations and minimizes or eliminates cross shard operations.

Based on the primary key to foreign key functional dependencies identified in the sample application data model, the following table family is formed.

  • Customers – parent table

  • Orders – child table

  • Lineitems – grandchild table

The remaining StockItems table is simply a lookup table mapping stock item number to stock item description and price (StockNo -> (Description, Price)).

Sharded database definitions require the following table DDL statements for members of the table family using reference partitioning, plus the additional DDL statement defining the StockItems lookup table:

CREATE SHARDED TABLE Customers (
 CustNo     NUMBER(3) NOT NULL,
 CusName    VARCHAR2(30) NOT NULL,
 Street     VARCHAR2(20) NOT NULL,
 City       VARCHAR2(20) NOT NULL,
 State      CHAR(2) NOT NULL,
 Zip        VARCHAR2(10) NOT NULL,
 Phone      VARCHAR2(12),
 CONSTRAINT RootPK PRIMARY KEY (CustNo)
)
PARTITION BY CONSISTENT HASH (CustNo)
PARTITIONS AUTO
TABLESPACE SET ts1
;

CREATE SHARDED TABLE Orders (
 PoNo       NUMBER(5) NOT NULL,
 CustNo     NUMBER(3) NOT NULL,
 OrderDate  DATE,
 ShipDate   DATE,
 ToStreet   VARCHAR2(20),
 ToCity     VARCHAR2(20),
 ToState    CHAR(2),
 ToZip      VARCHAR2(10),
 CONSTRAINT OrderPK PRIMARY KEY (CustNo, PoNo)
 CONSTRAINT CustFK Foreign Key (CustNo) REFERENCES Cusomters (CustNo)
)
PARTITION BY REFERENCE (CustFK)
;

CREATE DUPLICATED TABLE StockItems (
 StockNo     NUMBER(4) PRIMARY KEY
 Description VARCHAR2(20),
 Price       NUMBER(6,2)
);

CREATE SHARDED TABLE LineItems (
 LineNo      NUMBER(2) NOT NULL,
 PoNo        NUMBER(5) NOT NULL,
 StockNo     NUMBER(4) REFERENCES StockItems,
 Quantity    NUMBER(2),
 Discount    NUMBER(4,2),
 CONSTRAINT LinePK PRIMARY KEY (CustNo, LineNo, PoNo)
 CONSTRAINT LineFK FOREIGN KEY (CustNo, PoNo) REFERENCES Orders (CustNo, PoNo)
)
PARTITION BY REFERENCE (LineFK)
;

After comparing the sharded database DDL, shown above, to the original (non-sharded) database tables, you notice the following structural table changes (appearing in large bold italic).

  • The CREATE TABLE statement for tables in the table family includes the additional SHARDED keyword.

  • The CREATE TABLE statement for the lookup table includes additional keyword, DUPLICATED.

  • All tables in the table family contain the sharding key column, CustNo, as the leading column of the primary key. This is the sharding-specific de-normalization, which expands a composite primary key at every level of the table family hierarchy to include the immediate parent key, also known as the level key, as the leading component.

  • Sharded tables are PARTITIONED BY the sharding key column. In this particular case, the root table of the family is partitioned by CONSISTENT HASH. This partitioning schema propagates to lower hierarchy levels BY REFERENCE (reference partitioning). The data partitioning by CONSISTENT HASH is called the system-managed sharding method (as opposed to user-defined sharding).

  • In system-managed sharding, tablespace sets are defined for sharded tables. The first set of tablespaces is used for SHARDED tables. A tablespace set is used in a sharded database as a logical storage unit for one or more sharded tables and indexes. A tablespace set consists of multiple tablespaces distributed across shards in a shardspace. The database automatically creates the tablespaces in a tablespace set. The number of tablespaces is determined automatically and is equal to the number of chunks in the corresponding shardspace.

    CREATE TABLESPACE SET tbs1; for the sharded tables

    In our example, Customers, Orders, and LineItems are placed in tablespace set tbs1. That means that corresponding partitions of the three tables in the table family are stored in the same tablespace set, tbs1 (partition by reference). However, it is possible to specify separate tablespace sets for each table.

9.4 Preparing the Source Database

To make the transition to a sharded database schema smoother, you can modify the source, non-sharded database so that it matches the table definitions in the target sharded database.

Ideally, the table definitions (table name, column names, and their data types) in the target sharded database and the table definitions in the source database would be exactly the same. However, as part of the transition to a sharded database, you might need to modify the table definitions for use in the sharded database. If that is the case, you can modify the source, non-sharded database so that it matches the table definitions in the new sharded database. Depending on the extent of changes, this can also require changes to the application code. By modifying the source database schema, so that it matches the target sharded database schema ahead of the migration process, you provide conditions for uninterrupted transition from original non-sharded to the new sharded database. These preparations are prerequisites for minimum downtime, if downtime would be incurred at all. Also, as illustrated by the example application, the preparation for migration is a seamless and easily reversible process. The activities you undertake to prepare the source database are highly desirable, but not required. If you are, for whatever reason, not able to modify your source database operating environment, you can skip this topic.

The steps shown here follow the same sample schema that was defined in the previous topic.

In order to migrate the sample database, you must add the sharding key column, CustNo, to the LineItems table in the source database using ALTER TABLE , as shown in this example.

ALTER TABLE LineItems ADD (CustNo NUMBER(3));

With this additional column, the row data layout in the source table and the desired layout in the target sharded table are identical. Now you are ready to prime this new column with the matching data. Values in the additional sharding key column, CustNo, in the LineItems table must be derived from Orders joining LineItems in a parent-child relationship.

SELECT Orders.CustNo FROM Orders JOIN LineItems ON Orders.PONo = LineItems.PONo;

In this example, use the MERGE statement to populate the CustNo column. You could also use the standard SQL to accomplish the same goal. In the example shown here, the MERGE statement would look as follows.

SQL> BEGIN
  2  MERGE INTO LineItems l
  3  USING Orders o
  4  ON (l.PONo = o.PONo)
  5  WHEN MATCHED THEN
  6  UPDATE SET l.CustNo = o.CustNo;
  7  END;
  8  /

You may discover at this point that there is a referential integrity to be maintained for the CustNo column. To make sure the new column is populated correctly, you should add a NOT NULL constraint after executing the MERGE statement., as shown here.

ALTER TABLE LineItems MODIFY CustNo NOT NULL;

By running the above MERGE statement you bring the LineItems table row layout and the row data to the desired state. The additional CustNo column makes the LineItems table sharded the same way as the root of the table family (Customers). You might consider using this change as the one of the last actions before the actual migration. Otherwise, you must maintain this new column within your application. Consequently, you must also maintain the referential integrity for the added sharding key column in your existing database. The referential integrity constraint for the matching CustNo columns is defined in the LineItems table as shown here.

ALTER TABLE LineItems ADD CONSTRAINT LineFk FOREIGN KEY (CustNo, PONo) REFERENCES Orders (CustNo, PONo);

Before changing the referential integrity constraint on the LineItems table you must drop the existing FOREIGN KEY constraint referencing the Orders table. This could be accomplished safely by enclosing the DROP CONSTRAINT followed by ADD CONSTRAINT statements within ALTER TABLE LineItems READ ONLY; and ALTER TABLE LineItems READ WRITE;, or simply by locking the table with LOCK TABLE LineItems IN SHARE MODEfor the duration of the constraint modifications.

As a result of adding the CustNo column as a part of the foreign key definition in the LineItems table, you must modify the primary key on the Orders table. Changing the primary key, in turn, requires rebuilding indexes, and this may take some time to complete. This effort makes sense only if you plan to run your application against this new schema for some period of time before migrating to the sharded database.

The following example illustrates changing the LineItems and Orders schemata as a result of adding the sharding key to the LineItems table. Prior to dropping the existing foreign key constraint on the LineItems table, and primary key constraint on the Orders table, you must retrieve the respective constraint names as shown here.

SQL> SELECT a.table_name, a.column_name, a.constraint_name
  2  FROM ALL_CONS_COLUMNS A, ALL_CONSTRAINTS C
  3  WHERE A.CONSTRAINT_NAME = C.CONSTRAINT_NAME
  4  and a.table_name='LINEITEMS' and C.CONSTRAINT_TYPE = 'R';

LINEITEMS
PONO
SYS_C009087

LINEITEMS
STOCKNO
SYS_C009088

SQL> SELECT cols.table_name, cols.column_name, cols.constraint_name, cols.position
  2  FROM all_constraints cons, all_cons_columns cols
  3  WHERE cons.constraint_type = 'P'
  4  AND cons.constraint_name = cols.constraint_name
  5  AND cols.table_name = 'ORDERS'
  6  ORDER BY cols.table_name, cols.position;

ORDERS
ORDER_ID
ORDER_PK
         1
ORDERS
PONO
SYS_C009148
         1

SQL> ALTER TABLE LineItems READ ONLY;

Table altered.

SQL> ALTER TABLE Orders READ ONLY;

Table altered.

SQL> ALTER TABLE LineItems DROP CONSTRAINT SYS_C009087;

Table altered.

SQL> ALTER TABLE ORDERS DROP CONSTRAINT SYS_C009148;

Table altered.

SQL> ALTER TABLE ORDERS ADD CONSTRAINT order_pk PRIMARY KEY (CustNo, PONo);

Table altered.

SQL> ALTER TABLE LineItems ADD CONSTRAINT LineFk FOREIGN KEY (CustNo, PONo) REFERENCES Orders (CustNo, PONo);

Table altered.

SQL> ALTER TABLE Orders READ WRITE;

Table altered.

SQL> ALTER TABLE LineItems READ WRITE;

Table altered.

Similarly, you should extend the PRIMARY KEY definition for the LineItems table to a full level key by including CustNo as the leading column, as shown here.

ALTER TABLE LineItems ADD CONSTRAINT LinePK PRIMARY KEY (CustNo, PONo, LineNo);

Again, you must drop the existing PRIMARY KEY constraint before introducing the new one. To preserve the data integrity, modify the PRIMARY KEY and FOREIGN KEY constraints using one of the two transaction isolation strategies suggested earlier. In the following example, the LineItems table is locked while the constraint modifications take place. Afterward, COMMIT releases the lock.

SQL> SELECT cols.table_name, cols.column_name, cols.constraint_name, cols.position
  2  FROM all_constraints cons, all_cons_columns cols
  3  WHERE cons.constraint_type = 'P'
  4  AND cons.constraint_name = cols.constraint_name
  5  AND cols.table_name = 'LINEITEMS'
  6  ORDER BY cols.table_name, cols.position;

LINEITEMS
LINENO
SYS_C009086
         1
LINEITEMS
PONO
SYS_C009086
         2

SQL> LOCK TABLE LineItems IN SHARE MODE;

Table(s) Locked.

SQL> ALTER TABLE LINEITEMS DROP CONSTRAINT SYS_C009086;

Table altered.

SQL> ALTER TABLE LineItems ADD CONSTRAINT LinePK PRIMARY KEY (CustNo, PONo, LineNo);

Table altered.

SQL> COMMIT;

Commit complete.

The referential integrity related modifications are optional. The proposed modifications bring the source database very close to resembling the sharded target database. This further facilitates the transition process.

In some cases, referential integrity cannot be imposed or it is undesirable to create. If this is the case, then the reference partitioning cannot be defined. In that situation you can use the PARENT clause instead.

Finally, the additional CustNo column in the LineItems table might affect the existing queries, such as SELECT * FROM LineItems. To avoid this problem you can modify the CustNo column to become invisible, as shown here.

SQL> ALTER TABLE LineItems MODIFY CustNo INVISIBLE;

With these modifications to the source database tables, you have prepared the existing sample database for the migration.

9.5 Preparing the Target Sharded Database

Before you start data migration to the sharded database, you must create the sharded database schema according to your design.

The data migration from a non-sharded to a sharded database environment can be accomplished in two distinct ways:

  • Two-step approach: This is a more cautious, incremental approach to data migration. Create a sharded database with only one shard. As long as your sharded database is contained within one shard, your application, as well as your database maintenance procedures could be used without application code changes and/or a negligible amount of related modifications. In other words, your sharded database would behave the same way as non-sharded database upon migration to the sharded environment. Then, once you modify your applications and operating procedures for sharding, you can proceed with scaling out the database to the appropriate number of shards.
  • Single-step approach: Create a sharded database with the appropriate number of shards initially. In this case your application and operating procedures should be fully prepared for sharding operations upon migration.

The more cautious, two-step approach allows a smooth, but significantly longer transition to a sharded database environment. Running your application against a single shard gives you time to gradually modify your application for direct routing. Only after you modify your existing applications to use the shard director to connect to the correct shard, the remaining shards can be instantiated.

The first step of the process is creating a sharded database with only one shard. Then you modify your existing application as suggested in one of the following sections. The last step is to scale-out your sharded database to the number of shards you need. This method also provides an opportunity to split and rebalance the data chunks across all of the shards before scaling out.

This two-step migration approach not only requires more time, but it also requires more space. If you migrate all of your data to a single shard, you can load your sharded table data directly to the single shard without restrictions. After you scale out to multiple shards, you should strictly use the Data Pump utility if you want to correctly load sharded tables directly into multiple database shards. Duplicated tables reside in the catalog database, and you should always load duplicated tables using the catalog database.

Whether you decide to use the one-step or two-step approach, you must export your data and create your sharded database schema before you load the data into the sharded database. In the examples below, it is assumed that the software installation is complete and that you have created a sharded database environment, including at least a sharding catalog and one or more databases for the shards. You can create a new shard catalog or use an existing one. To illustrate the migration process, the examples in this procedure use the following database instances:

  • orignode – site hosting the original, non-sharded database instance, SID=orig

  • catnode – catalog node hosting shard catalog database instance, SID=ctlg

  • shrdnodeN – shard node(s) hosting the database shard instance(s), SID=shrdN, where N could be 1, 2, and so on

  • gsmnode – catalog node hosting the shard director (GSM) instance, SID=gsm1

Whether you have modified the source database in preparation for migration or not, your migration process requires modifications to DDL definitions including, at least, the CREATE SHARDED TABLE and CREATE DUPLICATED TABLE statements. In order to migrate the database schema to the target sharded database, you must extract the DDL definitions from your source database and modify the SHARDED and DUPLICATED table metadata. A convenient way to extract the DDL statements from the source database is to create a Data Pump extract file. Then use the Data Pump import utility against the database export file, as shown here.

impdp uname/pwd@orignode directory=expdir dumpfile=sample.dmp sqlfile=sample_ddl.sql

In this example, the impdp command does not actually perform an import of the contents of the dump file. Rather, the sqlfile parameter triggers the creation of a script named sample_ddl.sql which contains all of the DDL from within the export dump file. This database export file is created as shown here.

expdp uname/pwd@orignode full=Y directory=expdir dumpfile=sample.dmp logfile=sample.log

The full database export file has the entire database contents: data and metadata. Exporting the entire database may take a long time. If you want to do this step quickly, you can export only the metadata, or only the part containing the set of tables you are interested in, as shown in this example.

expdp uname/pwd directory=DATA_PUMP_DIR dumpfile=sample_mdt.dmp logfile=sample_mdt.log INCLUDE=TABLE:\"IN \( \'CUSTOMERS\', \'ORDERS\', \'STOCKITEMS\', \'LINEITEMS\' \) \" CONSISTENT=Y CONTENT=METADATA_ONLY

Trimming down the export in this way more efficiently captures a consistent image of the database metadata without a possibly lengthy database data dump process. You still must get the DDL statements in text format and perform the DDL modifications as required by your sharded database schema design. If you decided to export the entire database, you would likely also use it as an input for loading your data.

Data Pump provides a secure way to transport you data. The database administrator has to authorize the database user for required access to the database export directory, as shown here.

CREATE OR REPLACE DIRECTORY expdir AS ‘/some/directory’; 
GRANT READ, WRITE ON DIRECTORY expdir TO uname;
GRANT EXP_FULL_DATABASE TO uname;

With a full database export, the database administrator must grant the EXP_FULL_DATABASE role to the user, uname. No additional role is required for a table level export. For more information about the Data Pump utility see the Database Utilities documentation in the references below.

If you modified your source (non-sharded) database so that the row layout matches the target (sharded) database, no full database or table level export is required. The data can be efficiently transferred as is, without an intermediate dump file.

After finalizing your sharded database schema, run the prepared DDL against the sharding catalog database (ctlg) using database administrator credentials. All DDL statements must be executed in a session with the SHARD DDL setting enabled to ensure that all DDL statements are propagated from the catalog database (ctlg) to the shard databases (shrd1, 2, ..., N).

ALTER SESSTION ENABLE SHARD DDL;

With the sharded and duplicated tables defined, your sharded database is ready for data loading. It is recommended that you validate the sharding configuration using the GDSCTL VALIDATE command, before loading the data.

gdsctl validate

After successful validation your sharded database is ready for data loading. If you see inconsistencies or errors, you must correct the problem using the GDSCTL commands SHOW DDL and RECOVER.

Data Pump export utility files are, by default, consistent on a per table basis. If you want all of the tables in the export to be consistent to the same point in time, you must use the FLASHBACK_SCN or FLASHBACK_TIME parameters. Having a consistent “as of” point in time database export file is recommended. This is especially important if you opt for uninterrupted migration from a non-sharded to a sharded database, that is, if you want to provide continuous database operations during the migration. Migration to a sharded database using Data Pump during continuous operations is complemented with Oracle GoldenGate. An export command producing a consistent database snapshot would look like the following.

expdp uname/pwd@orignode full=Y directory=expdir dumpfile=sample.dmp logfile=sample.log CONSISTENT=Y FLASHBACK_TIME=SYSTIMESTAMP

The consistent snapshot database image requires additional CONSISTENT or FLASHBACK_TIME parameters. When you run the command you notice that both parameters, CONSISTENT and FLASHBACK_TIME, mean the same thing. Note that the timestamp converts to a system change number (SCN) as shown here.

SQL> SELECT TIMESTAMP_TO_SCN(SYSTIMESTAMP) FROM dual; 
TIMESTAMP_TO_SCN(SYSTIMESTAMP)
------------------------------
                       1559981

If you prefer using FLASHBACK_SCN over FLASHBACK_TIME, you can obtain the current SCN by selecting it from V$DATABASE as shown here.

SQL> SELECT current_scn FROM v$database;
CURRENT_SCN
-----------
    1560005

Alternatively, you can declare it as shown here.

SQL> SET SERVEROUTPUT ON
SQL> DECLARE SCN NUMBER;
  2  BEGIN
  3  SCN := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER;
  4  DBMS_OUTPUT.PUT_LINE(SCN);
  5  END;
  6  /
1560598

You might need to ask a database administrator for authorization to access DBMS_FLASHBACK.

You can make Data Pump run faster by using the PARALLEL parameter. This parameter should be used in conjunction with the %U wildcard in the DUMPFILE parameter to allow multiple dump files be created, as shown in this example.

expdp uname/pwd@orignode full=Y directory=expdir dumpfile=samp_%U.dmp logfile=samp.log CONSISTENT=Y PARALLEL=3 

The above command uses four parallel workers and creates four dump files suffixed with _01, _02, _03. The same wildcard can be used during the import to allow you to reference multiple input files. Note that the three dump files, samp_01.dmp, samp_02.dmp, and samp_03.dmp, are created by parallel export rather than the single output file, sample.dmp created in a previous example. Also, the elapsed time of the parallel export is less than third of the elapsed time with serial execution, that is, a single dump file output.

9.6 Migrating Your Data

After you create the target sharded database with a single shard, or multiple shards, and you have your sharded and duplicated tables defined, you can start migrating your data into the sharded database.

Make sure you understand the following data loading considerations before you start migrating your data from your source to the sharded database:

  • Differences between migrating duplicated and sharded tables

    Duplicated tables reside in the shard catalog, they are always loaded into the shard catalog database using any of available data loading utilities, or plain SQL. You have two options for loading sharded tables, however. The sharded tables can be loaded using the sharding coordinator (catalog), or they can be loaded directly into the shards, using the Data Pump utility.

  • Migrating sharded tables using the coordinator or direct loading to the shards

    Loading your sharded tables directly to the shard databases is always faster because you can load multiple shards simultaneously.

  • Migrating to multiple shards or migrating to a single shard

    Migrating a non-sharded database to a sharded database with a single shard does not require major changes to your application and database maintenance procedures. You can continue with your current operations largely unchanged until you are ready to split the data into shards. However, migrating a non-sharded database to sharded database with multiple shards involves non-trivial preparation processes, with modifications to the application source code being the most time consuming prerequisite. If you intend to migrate to sharded database with a single shard, you must also set up a plan to scale your database to multiple shards, that is, distribute the database chunks across multiple shards, at some later time. Meanwhile, you can work on the application and other changes required to run with a sharded database with multiple shards.

  • Migration with downtime or uninterrupted migration

    Uninterrupted migration from a non-sharded to a sharded database with only one shard is much easier and simpler than migration to a sharded database with multiple shards. The subsequent scale-out from one shard to multiple shards is preformed while the database is running.

    If you want to migrate from your non-sharded database to the target multi-shard database in a single step, it is strongly recommended that you try this out in a test environment first, and start migrating your production environment only after you make sure that the test environment migrates without issues.

Whether the target row data layout is prepared in the source database or not, there are various methods to choose from for efficient data migration. Choose the method that best fits your conditions: available disk space, remote file access, network throughput, and so on.

Consider Downtime During Migration

If you want to eliminate database down time, your migration plan must include Oracle GoldenGate. To keep your target (sharded) database in sync with the source (non-sharded) database, you must use Oracle GoldenGate to process the changes that are made to the source database during the migration process.

In addition to the benefit of a zero downtime migration, you might also choose to use Oracle GoldenGate for active-active replication of your sharded database. If you have defined Oracle GoldenGate active-active replication for your sharded database then all of the data migration activity should be restricted to the shard catalog database.

Migrating Data to Sharded Tables With Downtime

If you do not use Oracle GoldenGate, take measures to keep your database as available as possible during the migration; you should plan for the downtime.

If you decide to modify your source, non-sharded database tables before migration, your table schema would match your target sharded database schema, and your data migration process will be smoother than if you did not take this pre-migration step. This approach results in an identical row data layout in the source database and corresponding layout in your target sharded database, so you can copy over the database content directly.

  1. Export the data from your database tables.
    expdp uname/pwd@non_sharded_db directory=file_dir
          dumpfile=original_tables.dmp logfile=original_table.log
          INCLUDE=TABLE:\"IN \( \'CUSTOMERS\', \'ORDERS\', \'STOCKITEMS\',
          \'LINEITEMS\' \) \" CONSISTENT=Y CONTENT=DATA_ONLY

    This Data Pump export example is limited to the tables used by the Oracle Sharding sample application. Because the SHARDED and DUPLICATED tables have been already created in the sample, you only export the table content (DATA_ONLY).

  2. Make the export file (original_tables.dmp) accessible by the target database nodes before you start importing the data to the sharded database.

    You can either move this file (or multiple files in the case of parallel export) to the target database system or share the file over the network.

  3. When the export is complete, you can start importing the content to your sharded database.

    The DUPLICATED table (StockItems) must be loaded using the shard catalog. The following is an example of the import command.

    impdp uname/pwd@catnode:1521/ctlg directory=data_pump_dir
          dumpfile=original_tables.dmp logfile=imp.log tables=StockItems
          content=DATA_ONLY
  4. Load the shards directly, or use the shard catalog to load them.

    The best way to load the SHARDED tables (Customers, Orders, and LineItems) is to run the Data Pump on each shard (shrd1,2,…, N) directly. The following is an example of the import command on the first shard.

    impdp uname/pwd@shrdnode:1521/shrd1 directory=data_pump_dir
          dumpfile=original_tables.dmp logfile=imp.log tables=Customers,
          Orders, LineItems content=DATA_ONLY

    Alternatively, you can run Data Pump on the shard catalog to load all of the tables. The following example shows the import command.

    impdp uname/pwd@catnode:1521/ctlg directory=data_pump_dir
          dumpfile=original_tables.dmp logfile=imp.log
          tables=Customers, Orders, LineItems, StockItems
          content=DATA_ONLY

Migrating Data to Sharded Tables Without Downtime

Oracle Sharding provides tight integration between Oracle GoldenGate replication and Data Pump export and import utilities. If Oracle GoldenGate is not currently present on either the source database or target sharded database, then before you install it you should upgrade your database to the latest release of Oracle Database on both the source and target databases. Upgrading the databases provides the maximum available functionality and simplifies the setup. With Oracle Database 12c Release 2 and later you can use integrated capture on the non-sharded source and integrated replicat on the target sharded database.

As long as Data Pump export and import are used to recreate the entire database, Oracle GoldenGate ensures that the migration of database changes during and after the export and import takes place. It is up to you to decide what needs to be replicated by Oracle GoldenGate during the migration process. It is not recommended that you replicate database changes that are not required by the application.

When Oracle GoldenGate is configured for the source and target databases, it is recommended that you do testing using the live data before scheduling the production migration. If the source database is cloned you can use the clone for testing the migration without affecting your production environment.

Figure 9-5 Migration Without Downtime



The migration process using Data Pump combined with Oracle GoldenGate is illustrated above. The bulk of the data migration is performed using Data Pump, moving data directly to the shard catalog and shards. The database changes during the Data Pump run are collected in Oracle GoldeGate local trail files, and moved to the shard catalog database. The changes to sharded tables are propagated from the shard catalog database to the shards.

From the Data Pump perspective, the source database (ORIG) is split into the shard catalog (CTLG) and the shard databases (SHRD1, SHRD2, and SHRD3) using impdp non-sharded and impdp sharded processes. The impdp non-sharded process migrates duplicated tables to the shard catalog database. The three impdp sharded parallel processes migrate the sharded tables directly to shards SHRD1, SHRD2, and SHRD3.

From the Oracle GoldenGate perspective, all of the databases share extract (exsh), pump (pmsh), and replicat (rpsh) process pipelines forked from the Extract root process.

Assuming that you have prepared the obey files corresponding to the diagram above, a GGSCI terminal session for Oracle GoldenGate pipeline from the source database node (orignode) to shard catalog node (catnode) would look like the following example.

view params ./dirprm/add_exsh_2pumps.oby
-- add a change data extract process group named exsh 
-- exsh reads DUPLICATED and SHARDED tables from orig database redo logs
add extract exsh, tranlog, begin now
-- associate the trail file as output from exsh process group
add exttrail ./dirdat/et, extract exsh
-- add SHARDED and DUPLICATE change data extract pump process pmsh
-- pmsh copies local trail data to ctlgnode remote trail location
add extract pmsh, exttrailsource ./dirdat/et
-- associate the remote trail with pmsh
add rmttrail ./dirdat/et, extract pmsh
-- connect to the database and add table level supplemental logging for:
-- Customers, Orders, LineItems, and StockItems tables
add trandata uname.Customers
add trandata uname.Orders
add trandata uname.LineItems
add trandata uname.StockItems

Run the GGSCI obey command for the pipeline, followed by info all.

obey ./dirprm/add_exsh_2pumps.oby
info all

You should see the extract processes for the non-sharded and sharded pipeline initialized, and waiting in stopped status.

…
EXTRACT STOPPED exsh …
EXTRACT STOPPED pmsh …
…

Check the process group parameter, EXTRACT, for the sharded and catalog tables.

view params exsh
-- first line must be extract followed the name
extract exsh
-- login info to get metadata
userid uname@orignode, password pwd
-- export is writing to trail info
exttrail ./dirdat/exsh
-- checkpoint time interval with source
checkpointsecs 1
-- source table
table uname.Customers
table uname.Orders
table uname.LineItems
table uname.StockItems

View parameters for the Pump process group for all tables, pmsh.

view parms pmsh
-- first line must be extract followed the name
extract pmsh
-- no need to log into the database 
passthru
-- connect to remote host, write and talk to the manager there
rmthost shrdnode, mgrport 7810
-- where is the trail on remote host
rmttrail ./dirdat/rt
-- checkpoint time interval with target
checkpointsecs 1
-- tables
table uname.Customers
table uname.Orders
table uname.LineItems
table uname.StockItems

The Oracle GoldenGate Pump process is created under the assumption that the manager process on the Shard catalog node (catnode) uses port number 7810. This is a fair assumption because the shard catalog and shard databases run on separate machines.

Check the definition on the shard catalog node (catnode).

view params mgr
PORT 7810
-- used by the PUMP process on the source side for the collector on the target
DYNAMICPORTLIST 8000-8010
SYSLOG NONE

On the shard catalog node (catnode), look at the prepared Replicat process obey command file.

view params ./dirprm/add_rpsh.oby
-- connect to the database
dblogin userid uname@catnode, password pwd
-- add checkpoint table
add checkpointtable uname.gg_checkpoint
-- add replicat process rpsh that will convert remote trail into SQL continuously
add replicat rpsh, exttrail ./dirdat/rt, checkpointtable uname.gg_checkpoint

If you modified the source database to match the sharded row data layout in preparation for migration, you might have introduced invisible columns. Invisible columns can be preserved during the replication process by adding MAPINVISIBLECOLUMNS as a replicat process parameter.

Run the obey file ./dirprm/add_rpsh.oby on the shard catalog node (catnode).

At this point the system is configured for data migration without downtime. The bulk of the load is performed by the Data Pump export (expdp) and import (impdp) database utilities. The eventual database changes during the export and import processes are synchronized by Oracle GoldenGate processes. Note that the Oracle GoldenGate Pump process has nothing to do with any of the Data Pump processes.

Before starting the Data Pump export, the Oracle GoldenGate replication process must be provided with instantiation Commit Sequence Numbers (CSNs) for each table that is a part of the Data Pump export. As described earlier, this can be done by running expdp with CONSISTENT=Y and FLASBACK_SCN=scn_num. The FLASHBACK_SCN can be obtained with the following statement.

SELECT current_scn from v$database;

Because expdp is run with CONSISTENT=Y, all table images appear “as of scn_num”, so that the replication process can be started from the same, in this case, CSN number.

For the replicat on the shard catalog node, the appropriate GGSCI command might look like this.

START REPLICAT rpsh, AFTERCSN scn_num

The simpler way to do this is to use the ADD SCHEMATRANDATA GGSCI command on the source node. This command populates the system tables and views so that the instantiation CSNs can be used on the import. This way the Oracle GoldenGate CSN becomes synchronized with the Data Pump SCN by matching two stamps representing the committed version of the database. In other words, FLASHBACK_SCN for export process and AFTERCSN for replicat process are defined automatically, as shown here.

ADD SCHEMATRANDATA uname PREPARESCN ALLCOLS

The ADD SCHEMATRANDATA command enables schema-level supplemental logging for all of the current and future tables in a given ‘uname’ schema of the ORIG database to automatically log a superset of keys that Oracle GoldenGate uses for row identification. The PREPARESCN parameter instructs the Data Pump export (expdp) to automatically generate actions to set instantiation CSN (GGSCI command, SET_INSTANTIATION_CSN) for each table at target upon import (impdp). The ALLCOLS parameter enables the unconditional supplemental logging of all supported key and non-key columns for all current and future tables in the given schema. This option enables the logging of the keys required to compute dependencies, plus columns that are required for filtering, conflict resolution, or other purposes. It is important to note that the sharding related data migration is limited to the specific set of tables, and other database (incremental) changes are not propagated to the shard catalog.

The replication on the target database should be stopped before starting the Data Pump export. This should always be the case because the target database is created from scratch. The GGSCI command to stop the replicat process on the sahrd catalog node is shown here.

STOP REPLICAT rpsh

This command preserves the state of synchronization for the next time the replicat process starts, and it ensures that the Oracle GoldenGate manager processes do not automatically start the replicat process.

The extract process on the source database should be active before starting the Data Pump export. Check if extract is already active by using the INFO EXTRACT or STATUS EXTRACT command. The following command starts the extract process, if it is not already started.

START EXTRACT exsh, BEGIN NOW

From this point on, the extract process collects the changes to the source database for all of the tables involved in the sharding process, and you can safely initiate Data Pump export.

expdp uname/pwd@orignode full=Y directory=expdir dumpfile=sample.dmp
      logfile=sample.log

It is a good practice to verify that the export utility expdp FLASHBACK_SCN parameter is added automatically. You should be able to find the message “FLASHBACK automatically enabled to preserve database integrity” in the expdp command output. After the export completes, immediately continue with the Data Pump import on the target databases as described earlier for the shard catalog.

impdp uname/pwd@catnode:1521/ctlg directory=data_pump_dir
      dumpfile=sample.dmp logfile=imp.log
      tables=Customers,Orders,LineItems,StockItems content=DATA_ONLY

At this point it is safe to start replicat processes on the shard catalog node.

START REPLICAT rpsh

From this point on, the selected set of tables in the source database (Customers, Orders, LineItems, and StockItems) will be in synch with the Customers, Orders, and LineItems tables in the shards and the StockItems duplicated table in the shard catalog database. It is a good practice, after starting replicat processes, to take a look at the report file for the replicat processes. Verify that the replicat process was aware of the SCN or CSN number existing in the database while the export was in progress, and it knows that any changes after that SCN now need to be applied on the target table, look for “Instantiation CSN filtering is enabled on table uname.Customers, uname,Orders, …” to verify.

9.7 Migrating Your Application

The sharded database operating environment empowers applications with direct access to shards. This feature provides true linear scalability, but it comes with a small price—a slight change to the application code.

The examples that follow show you how to migrate your application. The examples are a skeleton of the sample application for which we migrated the database schema and data. The key parts of the sample application include order management and reporting functionality. contained in Java class, POManager. The first static method in this class introduces a new row into the Customers table using the addCustomer() method. The column values are passed in to this function using a parameter list, as shown here.

import java.sql.*;
import java.io.*;
import oracle.jdbc.driver.*;

public class POManager {
  public static void addCustomer (int custNo, String custName,
    String street, String city, String state, String zipCode,
    String phoneNo) throws SQLException {
      String sql = "INSERT INTO Customers VALUES (?,?,?,?,?,?,?)";
      try {
        Connection conn =
          DriverManager.getConnection("jdbc:default:connection:");
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, custNo);
        pstmt.setString(2, custName);
        pstmt.setString(3, street);
        pstmt.setString(4, city);
        pstmt.setString(5, state);
        pstmt.setString(6, zipCode);
        pstmt.setString(7, phoneNo);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  }

The second static method in the POManager class, addStockItem(), adds a row in the StockItem table. The column values are passed as parameter values, as shown in the following example.

  public static void addStockItem (int stockNo, String description,
    float price) throws SQLException {
      String sql = "INSERT INTO StockItems VALUES (?,?,?)";
      try {
        Connection conn =
          DriverManager.getConnection("jdbc:default:connection:");
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, stockNo);
        pstmt.setString(2, description);
        pstmt.setFloat(3, price);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  }

The third static method in the POManager class, enterOrder(), adds a row into the Orders table. The column values are provided in a parameter list, as shown here.

  public static void enterOrder (int orderNo, int custNo,
    String orderDate, String shipDate, String toStreet,
    String toCity, String toState, String toZipCode)
    throws SQLException {
      String sql = "INSERT INTO Orders VALUES (?,?,?,?,?,?,?,?)";
      try {
        Connection conn =
          DriverManager.getConnection("jdbc:default:connection:");
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, orderNo);
        pstmt.setInt(2, custNo);
        pstmt.setString(3, orderDate);
        pstmt.setString(4, shipDate);
        pstmt.setString(5, toStreet);
        pstmt.setString(6, toCity);
        pstmt.setString(7, toState);
        pstmt.setString(8, toZipCode);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  }

The next static method in the POManager class, addLineItem(), adds a row in the LineItems table. The column values are passed in as parameter values, as shown in the following example.

public static void addLineItem (int lineNo, int orderNo,
    int stockNo, int quantity, float discount) throws SQLException {
      String sql = "INSERT INTO LineItems VALUES (?,?,?,?,?)";
      try {
        Connection conn =
          DriverManager.getConnection("jdbc:default:connection:");
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, lineNo);
        pstmt.setInt(2, orderNo);
        pstmt.setInt(3, stockNo);
        pstmt.setInt(4, quantity);
        pstmt.setFloat(5, discount);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  }

The next static method in the POManager class, totalOrders(), produces the total order value for every order in the Orders table. The result set relation is printed out using the printResult() method, as shown here.

public static void totalOrders () throws SQLException {
    String sql = 
      "SELECT O.PONo, ROUND(SUM(S.Price * L.Quantity)) AS TOTAL " +
      "FROM Orders O, LineItems L, StockItems S " +
      "WHERE O.PONo = L.PONo AND L.StockNo = S.StockNo " +
      "GROUP BY O.PONo";
    try {
      Connection conn =
        DriverManager.getConnection("jdbc:default:connection:");
      PreparedStatement pstmt = conn.prepareStatement(sql);
      ResultSet rset = pstmt.executeQuery();
      printResults(rset);
      rset.close();
      pstmt.close();
    } catch (SQLException e) {System.err.println(e.getMessage());}
  }

The helper method, printResults(), shown below, is used to print out the result set relations produced by the totalOrders() method. A reference to the result set relation is passed in as a parameter.

static void printResults (ResultSet rset) throws SQLException {
    String buffer = "";
    try {
      ResultSetMetaData meta = rset.getMetaData();
      int cols = meta.getColumnCount(), rows = 0;
      for (int i = 1; i <= cols; i++) {
        int size = meta.getPrecision(i);
        String label = meta.getColumnLabel(i);
        if (label.length() > size) size = label.length();
        while (label.length() < size) label += " ";
        buffer = buffer + label + "  ";
      }
      buffer = buffer + "\n";
      while (rset.next()) {
        rows++;
        for (int i = 1; i <= cols; i++) {
          int size = meta.getPrecision(i);
          String label = meta.getColumnLabel(i);
          String value = rset.getString(i);
          if (label.length() > size) size = label.length();
          while (value.length() < size) value += " ";
          buffer = buffer + value + "  ";
        }
        buffer = buffer + "\n";
      }
      if (rows == 0) buffer = "No data found!\n";
      System.out.println(buffer);
    } catch (SQLException e) {System.err.println(e.getMessage());}
  }

The checkStockItem() static method, shown below, retrieves all orders, customers, and line item details for the specified stock item. The stock item is passed in as a parameter. The helper method, printResults(), detailed above, is used to print out the result set relations produced by the checkStockItem() method.

public static void checkStockItem (int stockNo)
    throws SQLException {
      String sql = "SELECT O.PONo, O.CustNo, L.StockNo, " + 
        "L.LineNo, L.Quantity, L.Discount " +
        "FROM Orders O, LineItems L " +
        "WHERE O.PONo = L.PONo AND L.StockNo = ?";
      try {
        Connection conn =
          DriverManager.getConnection("jdbc:default:connection:");
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, stockNo);
        ResultSet rset = pstmt.executeQuery();
        printResults(rset);
        rset.close();
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  }

The changeQuantity() static method updates the line item quantity for the given order and the stock item. The specific order number and the stock item are provided as input parameters, as shown here.

  public static void changeQuantity (int newQty, int orderNo,
    int stockNo) throws SQLException {
      String sql = "UPDATE LineItems SET Quantity = ? " +
        "WHERE PONo = ? AND StockNo = ?";
      try {
        Connection conn =
          DriverManager.getConnection("jdbc:default:connection:");
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, newQty);
        pstmt.setInt(2, orderNo);
        pstmt.setInt(3, stockNo);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  }

The last static method, deleteOrder(), removes the specified order from the Orders table and all associated line items. The order that is to be deleted is specified as the input parameter, as shown in the following example.

public static void deleteOrder (int orderNo) throws SQLException {
    String sql = "DELETE FROM LineItems WHERE PONo = ?";
    try {
      Connection conn =
        DriverManager.getConnection("jdbc:default:connection:");
      PreparedStatement pstmt = conn.prepareStatement(sql);
      pstmt.setInt(1, orderNo);
      pstmt.executeUpdate(); 
      sql = "DELETE FROM Orders WHERE PONo = ?";
      pstmt = conn.prepareStatement(sql);
      pstmt.setInt(1, orderNo);
      pstmt.executeUpdate(); 
      pstmt.close();
    } catch (SQLException e) {System.err.println(e.getMessage());}
  }
}

Now, look at the sample application program code that is modified for sharding. The first thing to note are the additional imports of sharding related Java packages, starting with OracleShardingKey.

import java.sql.*;
import java.io.*;
import oracle.jdbc.driver.*;

//
// import sharding and related packages
//
import oracle.jdbc.OracleShardingKey;
import oracle.jdbc.OracleType;
import oracle.jdbc.pool.OracleDataSource;

// 
// Sample App: order management and reporting
// modified for sharding
//
public class POManager 
{
  // Connection factory for the sharded database used by Sample App
  private OracleDataSource ods;

  // 
  // Construct POManager class using Sharded database connection properties.
  // Use service name when connecting to sharded database, something like:
  // "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(HOST=myhost)(PORT=3216)”
  //   “(PROTOCOL=tcp))(CONNECT_DATA=(SERVICE_NAME=myservice)(REGION=east)))"
  //
  public POManager(String yourURL, String yourUser, String yourPwd) 
    throws SQLException
  {
      ods = new OracleDataSource();
      ods.setURL(yourURL);
      ods.setUser(yourUser);
      ods.setPassword(yourPwd);
 } // POManager

As shown above, the POManager class now contains the OracleDataSource factory for Connection objects. The following two methods show you how to use the connection factory to produce direct and proxy routing connections. The getCatConn() method returns a connection to the sharding catalog. The getShardConn() method returns the connection to the shard that matches the sharding key provided as a parameter.

 //
 // Connect to the Sharding Catalog database.
 //
 public static Connection getCatConn() throws SQLException
 {
    Connection catConn = ods.getConnection();
 } // getCatConn

 //
 // Connect to Shard database using sharding key.
 //
 public static Connection getShardConn(int custNo) throws SQLException
 {
     OracleShardingKey shardKey = 
    ods.createShardingKeyBuilder().subkey(custNo, JDBCType.NUMERIC).build();
  OracleConnection shardConn = 
    ods.createConnectionBuilder().shardingKey(shardingKey);
  return shardConn;
 } // getShardConn

With all of this in mind, you would rewrite the addCustomer() method for the sharding environment, using getShardConn(), as shown here.

  //
  // Connect to Shard database to add a customer.
  //
  public static void addCustomer (int custNo, String custName,
    String street, String city, String state, String zipCode,
    String phoneNo) throws SQLException {
      String sql = "INSERT INTO Customers VALUES (?,?,?,?,?,?,?)";
      try {
        Connection conn = getShardConn(custNo);
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, custNo);
        pstmt.setString(2, custName);
        pstmt.setString(3, street);
        pstmt.setString(4, city);
        pstmt.setString(5, state);
        pstmt.setString(6, zipCode);
        pstmt.setString(7, phoneNo);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  } // addCustomer

In addStockItem(), you use getCatConn() to connect to the shard catalog database and insert into the duplicated table, StockItems.

//
  // Connect to Sharding Catalog to add a stock item.
  //
  public static void addStockItem (int stockNo, String description,
    float price) throws SQLException {
      String sql = "INSERT INTO StockItems VALUES (?,?,?)";
      try {
        Connection conn = getCatConn();
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, stockNo);
        pstmt.setString(2, description);
        pstmt.setFloat(3, price);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  } // addStockItem

The Orders tables is a child of the Customers table. To insert into the Orders table, connect to the shard database based on the provided sharding key, as shown in the following example.

//
  // Connect to Shard database to add an order for a customer.
  //
  public static void enterOrder (int orderNo, int custNo,
    String orderDate, String shipDate, String toStreet,
    String toCity, String toState, String toZipCode)
    throws SQLException {
      String sql = "INSERT INTO Orders VALUES (?,?,?,?,?,?,?,?)";
      try {
        Connection conn = getShardConn(custNo);
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, orderNo);
        pstmt.setInt(2, custNo);
        pstmt.setString(3, orderDate);
        pstmt.setString(4, shipDate);
        pstmt.setString(5, toStreet);
        pstmt.setString(6, toCity);
        pstmt.setString(7, toState);
        pstmt.setString(8, toZipCode);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  } // enterOrder

Notice that for the root of the table family, Customers, and the immediate child level, Orders, the sharding kay is provided as a parameter. For levels below immediate child you might not have a full level key, so you must retrieve it from the immediate parent. In the following example there is a helper method, getCustomerFromOrder(), for retrieving the sharding key, custNo, from the Order table, based on the Order table key value, orderNo. Every child table row is supposed to have a single parent row. That is why an integrity violation exception is raised for children without a parent, or children with more than one parent.

//
  // Determine which shard order is in
  //
  static int getCustomerFromOrder(int orderNo) throws SQLException
  {
     String sql = "SELECT O.CustNo FROM Orders O " +
                  "WHERE O.PONo = ?";
     int custNo;
     int rsSize = 0;
     Exception exception;
     try {
        Connection conn = getCatConn();
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, orderNo);
        PreparedStatement pstmt = conn.prepareStatement(sql);
        ResultSet rset = pstmt.executeQuery();
        while(rset.next() && rsSize < 3) {
          custNo = rs.getInt(“CustNo);
          rsSize++;
        }
        rset.close();
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
      if (rsSize == 0) {
        throw new 
          SQLIntegrityConstraintViolationException(
                                    “No matching parent level key”);
      If (rsSize == 2)
      {
        throw new
          SQLIntegrityConstraintViolationException(
                                   “More than one parent level key”);
      }
      return custNo;
  } // getCustomerFromOrder

Note:

Your code should not make any assumptions about the number of shards in the sharded database.

The following example shows the rewritten addLineItem() method, that was provided in the original POManager class, now using the getCustomerFromOrder() helper method. For a given orderNo it queries the shard catalog for the matching custNo values. This part of the query is propagated to all of the shards using proxy routing. The helper function returns a single custNo value. Based on this value, use the direct route to the shard to insert the LineItems table row.

  //
  // Get customer (parent) from the catalog, then insert into a shard!
  // 
  public static void addLineItem (int lineNo, int orderNo,
    int stockNo, int quantity, float discount) throws SQLException {
      String sql = "INSERT INTO LineItems VALUES (?,?,?,?,?,?)";
      try {
        int custNo = getCustomerFromOrder(orderNo);
        Connection conn = getShardConn(custNo);
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, custNo);
        pstmt.setInt(2, lineNo);
        pstmt.setInt(3, orderNo);
        pstmt.setInt(4, stockNo);
        pstmt.setInt(5, quantity);
        pstmt.setFloat(6, discount);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  } // addLineItem

  //
  // You’ve got whole level key handy, insert into a shard directly.
  // 
  public static void addLineItemWithinParent (int lineNo, int orderNo,
    int custNo, int stockNo, int quantity, float discount) 
    throws SQLException 
  {
      String sql = "INSERT INTO LineItems VALUES (?,?,?,?,?,?)";
      try {
        Connection conn = getShardConn(custNo);
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, custNo);
        pstmt.setInt(2, lineNo);
        pstmt.setInt(3, orderNo);
        pstmt.setInt(4, stockNo);
        pstmt.setInt(5, quantity);
        pstmt.setFloat(6, discount);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  } // addLineItemWithinParent

Note:

Most of the time the full level key context will be available in the application context. That is why we introduce the additional method addLineItemWithinParent() which connects directly to the shard based on the leading column custNo in the LineItems table composite level key. This eliminates the round-trip to the shard catalog. Avoid using expensive sharding programming practices similar to our helper function: getCustomerFromOrder() whenever possible.

A majority of aggregate queries must be executed using the shard catalog connection. The shard catalog database uses proxy routing to collect partial results from the shards. The final aggregation is produced based on the partial results produced by the shards. This is why the totalOrders() method introduced in the original POManager class is rewritten to connect to the shard catalog database, as shown here.

//
  // xshard aggregate connects to the shard catalog
  //
  public static void totalOrders () throws SQLException {
    String sql = 
      "SELECT O.PONo, ROUND(SUM(S.Price * L.Quantity)) AS TOTAL " +
      "FROM Orders O, LineItems L, StockItems S " +
      "WHERE O.PONo = L.PONo AND L.StockNo = S.StockNo " +
      "GROUP BY O.PONo";
    try {
      Connection conn = getCatConn();
      PreparedStatement pstmt = conn.prepareStatement(sql);
      ResultSet rset = pstmt.executeQuery();
      printResults(rset);
      rset.close();
      pstmt.close();
    } catch (SQLException e) {System.err.println(e.getMessage());}
  } // totalOrders

The printResults() helper function, introduced previously, does not depend on the database structure, so no modifications are necessary.

  //
  // helper function – no change required
  //
  static void printResults (ResultSet rset) throws SQLException {
    String buffer = "";
    try {
      ResultSetMetaData meta = rset.getMetaData();
      int cols = meta.getColumnCount(), rows = 0;
      for (int i = 1; i <= cols; i++) {
        int size = meta.getPrecision(i);
        String label = meta.getColumnLabel(i);
        if (label.length() > size) size = label.length();
        while (label.length() < size) label += " ";
        buffer = buffer + label + "  ";
      }
      buffer = buffer + "\n";
      while (rset.next()) {
        rows++;
        for (int i = 1; i <= cols; i++) {
          int size = meta.getPrecision(i);
          String label = meta.getColumnLabel(i);
          String value = rset.getString(i);
          if (label.length() > size) size = label.length();
          while (value.length() < size) value += " ";
          buffer = buffer + value + "  ";
        }
        buffer = buffer + "\n";
      }
      if (rows == 0) buffer = "No data found!\n";
      System.out.println(buffer);
    } catch (SQLException e) {System.err.println(e.getMessage());}
  } // printResults

The values in StockItem table key column, stockNo, could potentially match Order table rows on all of the shards. That is why you must modify the checkStockItem() method, introduced in the original POManager class, to connect to the shard catalog. The shard catalog database returns a union of all rows returned as a result of local joins performed in each shard.

 //
  // xshard query matching duplicated table 
  //
  public static void checkStockItem (int stockNo)
    throws SQLException {
      String sql = "SELECT O.PONo, O.CustNo, L.StockNo, " + 
        "L.LineNo, L.Quantity, L.Discount " +
        "FROM Orders O, LineItems L " +
        "WHERE O.PONo = L.PONo AND L.StockNo = ?";
      try {
        Connection conn = getCatConn();
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, stockNo);
        ResultSet rset = pstmt.executeQuery();
        printResults(rset);
        rset.close();
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  } // checkStockItem

The changeQuantity() method, introduced in the original POManager class, updates the grandchild table, LineItems. Again, use the getCustomerFromOrder() helper method to obtain the sharding key so your application can connect to the correct shard to perform the update. Similar to the addLineItem() method modification, you should expect that the custNo column value is available in the application context. That is why you should use changeQuantifyWithinParent() in changeQuantity(), saving the round trip to the shard catalog.

  //
  // Get customer (parent) from the catalog then update a shard!
  //
  public static void changeQuantity (int newQty, 
                                     int orderNo, int stockNo) 
     throws SQLException 
  {
      String sql = "UPDATE LineItems SET Quantity = ? " +
        "WHERE CustNo = ? AND PONo = ? AND StockNo = ?";
      try {
        int custNo = getCustomerFromOrder(orderNo);
        Connection conn = getShardConn(custNo);
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, newQty);
        pstmt.setInt(2, custNo);
        pstmt.setInt(3, orderNo);
        pstmt.setInt(4, stockNo);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  } // changeQuantity

  //
  // You’ve got the full level key handy, update shard directly. 
  //
  public static void changeQuantityWithinParent (int newQty, 
                       int custNo, int orderNo, int stockNo) 
      throws SQLException 
  {
      String sql = "UPDATE LineItems SET Quantity = ? " +
        "WHERE CustNo = ? AND PONo = ? AND StockNo = ?";
      try {
        Connection conn = getShardConn(custNo);
        PreparedStatement pstmt = conn.prepareStatement(sql);
        pstmt.setInt(1, newQty);
        pstmt.setInt(2, custNo);
        pstmt.setInt(3, orderNo);
        pstmt.setInt(4, stockNo);
        pstmt.executeUpdate(); 
        pstmt.close();
      } catch (SQLException e) {System.err.println(e.getMessage());}
  } // changeQuantityWithinParent

The modification to the last method, deleteOrder(), follows the same guidelines applied to addLineItem(), and changeQuantity(). Before deleting a row from the LineItems table, your application should first look in the shard catalog for the custNo sharding key value corresponding to the requested order. Once you have the sharding key, connect to the shard and perform the delete. Again, use the deleteOrderWithinParent() method, with the expectation that the sharding key value is available in the application context.

  //
  // Get customer (parent) first from the catalog, delete in a shard!
  //
  public static void deleteOrder (int orderNo) throws SQLException 
  {
    String sql = "DELETE FROM LineItems WHERE CustNo = ? AND PONo = ?";
    try {
      int custNo = getCustomerFromOrder(orderNo);
      Connection conn = getShardConn(custNo);
      PreparedStatement pstmt = conn.prepareStatement(sql);
      pstmt.setInt(1, orderNo);
      pstmt.executeUpdate(); 
      sql = "DELETE FROM Orders WHERE PONo = ?";
      pstmt = conn.prepareStatement(sql);
      pstmt.setInt(1, custNo);
      pstmt.setInt(2, orderNo);
      pstmt.executeUpdate(); 
      pstmt.close();
    } catch (SQLException e) {System.err.println(e.getMessage());}
  } // deleteOrder

  //
  // You’ve got whole level key handy, delete in shard directly
  //
  public static void deleteOrderWithinParent (int custNo, 
                                              int orderNo) 
      throws SQLException 
  {
    String sql = "DELETE FROM LineItems WHERE CustNo = ? AND PONo = ?";
    try {
      Connection conn = getShardConn(custNo);
      PreparedStatement pstmt = conn.prepareStatement(sql);
      pstmt.setInt(1, orderNo);
      pstmt.executeUpdate(); 
      sql = "DELETE FROM Orders WHERE PONo = ?";
      pstmt = conn.prepareStatement(sql);
      pstmt.setInt(1, custNo);
      pstmt.setInt(2, orderNo);
      pstmt.executeUpdate(); 
      pstmt.close();
    } catch (SQLException e) {System.err.println(e.getMessage());}
  } // deleteOrderWithinParent


} // POManager