Table of Contents

Search

  1. Preface
  2. Introduction to Informatica Connector Toolkit
  3. Before you begin
  4. Develop a connector for Cloud Data Integration
  5. Develop a connector for Data Loader
  6. Import a connector
  7. Connection attributes
  8. Type system
  9. Connector metadata
  10. Partitioning capability
  11. Pushdown capability
  12. Mappings in advanced mode
  13. Manual changes to Informatica Connector Toolkit source code
  14. Runtime behavior
  15. Connector example: MySQL_Cloud
  16. Version control integration
  17. Appendix A: Metadata models
  18. Appendix B: ASO model
  19. Appendix C: Connector project migration
  20. Appendix D: Frequently used generic APIs in Informatica Connector Toolkit
  21. Appendix E: Frequently asked questions

Cloud Data Integration Connector Toolkit Developer Guide

Cloud Data Integration Connector Toolkit Developer Guide

Code changes for pushdown capability

Code changes for pushdown capability

When you enable full pushdown capability for the connector, you must manually modify the Informatica Connector Toolkit classes and methods to implement the pushdown logic.
The connector must validate if pushdown is supported for the extensionID of the source and generate the pushdown SQL query. Set the pushdown SQL query in the
PushdownRuntimeMetadata
object and validate if the SQL query is generated successfully. If the validation is unsuccessful, the mapping runs without full pushdown.
Add the following code changes to the
ASOOperationObjMgr.ValidateAll()
and
OperationAdapter.initDataSourceOperation()
method in the runtime.semantic and runtime.adapter package respectively:
  1. In the ASOOperationObjMgr.ValidateAll() method in the runtime.semantic package, get the PushdownRuntimeMetadata object:
    ASOOperation targetASO = (ASOOperation)currentObj; PushdownRuntimeMetadata pushdownMetadata = targetASO.getASOPushdownRuntimeMetadata();
  2. Validate if pushdown is supported for the source:
    List<ASOOperation> sourceAso = pushdownMetadata.getAsoOperationList(); //Get a list of native sources and check for a source with extensionID other than MySQL for(ASOOperation aso:sourceAso){ String extentionID = ((SD_ASOOperation)aso)._get_imfObject().getBaseASO().getExtensionId(); if(!extentionID.equalsIgnoreCase("com.infa.adapter.mysql_cloud")){ Utils.createPushdownValidationError(currentObj,"Ecosystem Pushdown is not supported for Combination of connectors MySQL and "+extentionID); return false; } }
  3. Create a target catalog to add all the source objects:
    //Get the NMOTypeContext object to create a catalog & add records NMOTypeContext SourceNMO= new NMOTypeContext(pushdownMetadata.getAsoOperationList().get(0).getASO().getNmoType(),"Provide Target AdapterID); //Get the Catalog to add the Source/Lookup records Catalog catalog = SDKPushdownUtils.getCatalog(SourceNMO);
  4. Get the list of OperationBase from the projection for the source objects
    List<OperationBase> mergeOperation = new ArrayList<>(); List<OperationBase> baseOperations = pushdownMetadata.getAsoOperationList().get(0).getReadProjection().getBaseOperations(OperationTypeEnum.JOIN); if (baseOperations != null && baseOperations.size() > 0) { List<OperationBase> ops = pushdownMetadata.getAsoOperationList().get(0).getReadProjection().getBaseOperations(OperationTypeEnum.CONVERSION); for (OperationBase op : ops) { mergeOperation.add(op); } } else{ List<OperationBase> opsNJ = pushdownMetadata.getAsoOperationList().get(0).getReadProjection().getBaseOperations(OperationTypeEnum.CONVERSION); for (OperationBase op : opsNJ) { mergeOperation.add(op); } }
  5. Create flat records for all the source objects and add them to the list of source records:
    //List of records for source List<Record> listOfRecords=new ArrayList<Record>(); for(OperationBase operationBase : mergeOperation) { Factory sdkFactory = catalog.getFactory(); FlatRecord flatRecord = sdkFactory.newFlatRecord(catalog); Operation operation = (Operation) operationBase; flatRecord.setName(tableName); flatRecord.setNativeName(tableName); List<FieldBase> fieldList = (((SD_ConversionOperation) operation).getOperationFields()); //Populate the Flat record fields created for this source for(FieldBase fld:fieldList) { Field field = sdkFactory.newField(catalog); if(fld instanceof SAD_DerivedField) { SD_DerivedField dField=(SD_DerivedField)fld; //Field nativeFld = (Field)fld; field.setName(dField.getName()); field.setNativeName(dField.getName()); field.setDataType(dField.getTypeName()); field.setPrecision(dField.getPrecision()); field.setScale(dField.getScale()); flatRecord.addField(field); } } catalog.addRootRecord(flatRecord); //Add it to the list of source records listOfRecords.add(flatRecord) }
  6. Get the list of OperationBase from the projection for lookup objects:
    List<OperationBase> lookupOpBase = new ArrayList<OperationBase>(); List<ASOOperation> asoOps = pushdownMetadata.getLookupASOOperationList(); if (asoOps != null && !asoOps.isEmpty()) { for (ASOOperation eachASOOp : asoOps) { List<OperationBase> ops = eachASOOp.getReadProjection().getBaseOperations(OperationTypeEnum.CONVERSION); lookupOpBase.add(ops.get(0)); } }
  7. Create flat records for all the lookup source objects and add them to the list of lookup records:
    List<Record> lookupRecord=new ArrayList<Record>(); for(OperationBase operationBase : lookupOpBase) { Factory sdkFactory = catalog.getFactory(); FlatRecord flatRecord = sdkFactory.newFlatRecord(catalog); Operation operation = (Operation) operationBase; flatRecord.setName(tableName); flatRecord.setNativeName(tableName); List<FieldBase> fieldList = (((SD_ConversionOperation) operation).getOperationFields()); //Populate the Flat record fields created for this source for(FieldBase fld:fieldList) { Field field = sdkFactory.newField(catalog); if(fld instanceof SAD_DerivedField) { SD_DerivedField dField=(SD_DerivedField)fld; //Field nativeFld = (Field)fld; field.setName(dField.getName()); field.setNativeName(dField.getName()); field.setDataType(dField.getTypeName()); field.setPrecision(dField.getPrecision()); field.setScale(dField.getScale()); flatRecord.addField(field); } } catalog.addRootRecord(flatRecord); //Add it to the list of source records lookupRecord.add(flatRecord) }
  8. Create a PushdownUtilsContext object that provides all the connector instances needed to generate the pushdown SQL query:
    //MySQL_Cloud is an example of the engine object to be passed as an argument for creating PushdownUtilsContext MySQL_CloudEngine mySQLEng = new MySQL_CloudEngine(new MySQL_CloudASOOperationObjMgr()); PushdownUtilsContext pdctx=null; //Create a new PushdownUtilsContext object based on whether a lookup source is used the mapping if(lookupOpBase!=null&&!lookupRecord.isEmpty()) { pdctx= new PushdownUtilsContext(mySQLEng, pushdownMetadata.getAsoOperationList().get(0).getReadProjection(), ((ASOOperation)currentObj).getWriteProjection(), ctx, mergeOperation, listOfRecords, lookupOpBase, lookupRecord); } else { pdctx= new PushdownUtilsContext(mySQLEng, pushdownMetadata.getAsoOperationList().get(0).getReadProjection(), ((ASOOperation)currentObj).getWriteProjection(), ctx, mergeOperation, listOfRecords); }
  9. Get the pushdown SQL query using the PushdownUtilsContext, optimize the pushdown SQL query, and set the optimized pushdown SQL query in the PushdownRuntimeMetadata object:
    try { PushdownSql pdoSQL=SDKPushdownUtils.getPushdownSDKSQL(pdctx); String finalOptmizedQuery=(String)pdoSQL.getOptimizedSql().get(0); pushdownMetadata.setSql(finalOptmizedQuery); if(finalOptmizedQuery.isEmpty()||finalOptmizedQuery==null) { Utils.createPushdownValidationError(currentObj,"Pushdown SQL is empty"); return false; } } } catch (ExpressionParseException e) {` Utils.createPushdownValidationError(currentObj,e.getMessage()); return false; } Utils.logMessage(currentObj,"Full Pushdown Query is : "+pushdownMetadata.getSql()); Utils.logMessage(currentObj,"Pushdown Enabled Successfully"); return true;
  10. Add any queries to create staging tables to custom metadata of PushdownRuntimeMetadata object. Create two lists createStagingTableQuery & dropStagingTableQuery when the list of the source or lookup records are added to the target catalog. You can add the queries in both the lists to the custom metadata of PushdownRuntimeMetadata to be accessed at runtime.
    Perform Step 10 before you return the status of SQL query generation in step 9.
    for (int i = 0; i < this.createStagingTableQuery.size(); i++) { pushdownMetadata.addCustomMetadata(pushdownMetadata.getCustomMetadataSize(), (String)this.createStagingTableQuery.get(i)); } /**Populate the query to create staging tables in custom metadata of PushdownRuntimeMetadataObject * The custom metadata can be accessed at runtime to execute these queries */ pushdownMetadata.addCustomMetadata(pushdownMetadata.getCustomMetadataSize(), "Partition"); for (int j = 0; j < this.dropStagingTableQuery.size(); j++) { pushdownMetadata.addCustomMetadata(pushdownMetadata.getCustomMetadataSize(), (String)this.dropStagingTableQuery.get(j)); }
  11. Access the source connection attributes and read runtime attributes.
    Perform Step 11 before returning the status of SQL query generation in step 9.
    The following code snippet shows an example for MySQL_Cloud Connector:
    private void getSourceConnectionAndReadAttrs(PushdownRuntimeMetadata pushdownMetadata, MetadataObject currentObj) { List<OperationBase> opsNativeSource=pushdownMetadata.getAsoOperationList().get(0).getReadProjection().getBaseOperations(OperationTypeEnum.NATIVE_SOURCE); for (OperationBase op : opsNativeSource) { Map<String, Object> srcConn=SDKPushdownUtils.getNativeSourceConnection(pushdownMetadata.getAsoOperationList().get(0).getReadProjection(),(NativeSource) op); String nativeSourceName=((NativeSource) op).getNativeRecord().getNativeName(); String nativeSourceCatalog=(String) srcConn.get("catalog"); String nativeSourceHost=(String) srcConn.get("host"); Utils.logMessage(currentObj,"Connection Attributes for Source "+nativeSourceName); Utils.logMessage(currentObj,"Catalog: "+nativeSourceCatalog+" Host:"+nativeSourceHost); Map<String, Object> nativeSourceReadAttrs=SDKPushdownUtils.getNativeSourceReadAttributes(pushdownMetadata.getAsoOperationList().get(0).getReadProjection(), (NativeSource) op); Utils.logMessage(currentObj,"Read Runtime Attributes for Source "+nativeSourceName); String nativeSourcePreSQL=(String) nativeSourceReadAttrs.get("preSQL"); String nativeSourcePostSQL=(String) nativeSourceReadAttrs.get("postSQL"); int nativeSourceRowOffSet= (int) nativeSourceReadAttrs.get("rowOffSet"); int nativeSourceRowLimit= (int) nativeSourceReadAttrs.get("rowLimit"); long nativeSourceThresholdLimit=(long) nativeSourceReadAttrs.get("thresholdLimit"); Utils.logMessage(currentObj,"PreSQL: "+nativeSourcePreSQL); Utils.logMessage(currentObj,"PostSQL: "+nativeSourcePostSQL); Utils.logMessage(currentObj,"RowOffSet: "+nativeSourceRowOffSet); Utils.logMessage(currentObj,"RowLimit: "+nativeSourceRowLimit); Utils.logMessage(currentObj,"RowLimit: "+nativeSourceThresholdLimit); } }
  12. In the OperationAdapter.initDataSourceOperation() method in the runtime.adapter package, if the DataSourceOperation object is an instance of the write capability and PushdownRuntimeMetadata object returns an SQL query, process the pushdown SQL query returned by the PushdownRuntimeMetadata.getSql(). You must drop the staging tables and close any open connections before returning the status.
    The following code snippet shows an example for MySQL_Cloud Connector:
    //Pushdown query execution for MySQL_Cloud is given below PushdownRuntimeMetadata pushdownMeta = dsoHandle.getAdapterDataSourceOperation().getASOPushdownRuntimeMetadata(); if (pushdownMeta != null) { int j=0; for(int i=0;i<pushdownMeta.getCustomMetadataSize();i++){ String createStagingTableQuery = pushdownMeta.getCustomMetadata(i); if(createStagingTableQuery.equalsIgnoreCase("partition")){ j=i; break; } writeLog(EMessageLevel.MSG_INFO, "Create Staging Table for PDO with Query:" + createStagingTableQuery); tstmt.executeUpdate(createStagingTableQuery); } writeLog(EMessageLevel.MSG_INFO, "Full PDO Query is:" + pushdownMeta.getSql()); rowInserted=tstmt.executeUpdate(pushdownMeta.getSql()); for(int k=j+1;k<pushdownMeta.getCustomMetadataSize();k++){ writeLog(EMessageLevel.MSG_INFO, "Dropping Staging Table for PDO with Query:" + pushdownMeta.getCustomMetadata(k)); tstmt.executeUpdate(pushdownMeta.getCustomMetadata(k)); } if(rowInserted > 0){ //Set the RowStats for executed PDO Query RowsStatInfo rowstatInfoPDO = runtimeConfigHandleInit.getRowsStatInfo(EIUDIndicator.INSERT); rowstatInfoPDO.incrementRequested(rowInserted); rowstatInfoPDO.incrementAffected(rowInserted); rowstatInfoPDO.incrementApplied(rowInserted); } } // closing statements and connection to data source } catch (Exception e) { writeLog(EMessageLevel.MSG_ERROR, e1.getMessage()); return EStatus.FAILURE; }
  13. In the ASOOperationObjMgr.ValidateAll() method in the runtime.semantic package, override the TypeHandler, QueryVisitor, Engine, and PostProcessor methods to generate and validate the pushdown SQL query.
    The following code snippet shows an example for MySQL_Cloud Connector:
    public MySQL_CloudTypeHandler getTypeHandler() { return new MySQL_CloudTypeHandler(); } /** * connector specific QueryVisitor instance should be returned here by connector specific implementation of MD_ASOOperation **/ @Override public XVisitor<SdkXNode, SDKExprNode> getSDKXVisitor(XVisitorContext ctx) { return new MySQL_CloudXVisitor<SdkXNode>(ctx.getLog(), ctx.getScope()); } @Override public SDKPostProcessor getQueryPostProcessor(PostProcessorContext ctx) { return new MySQL_CloudPostProc(ctx); } /** * Get connector specific Engine (used by CDI-e pushdown framework currently) * @return connector specific engine */ public SDKEngine getAdapterEngine(SDKEngineContext ctx) { return new MySQL_CloudEngine(); }

0 COMMENTS

We’d like to hear from you!