Table of Contents

Search

  1. Preface
  2. Introduction to Informatica Connector Toolkit
  3. Before you begin
  4. Develop a connector for Cloud Data Integration
  5. Develop a connector for Data Loader
  6. Import a connector
  7. Connection attributes
  8. Type system
  9. Connector metadata
  10. Partitioning capability
  11. Pushdown capability
  12. Mappings in advanced mode
  13. Manual changes to Informatica Connector Toolkit source code
  14. Runtime behavior
  15. Connector example: MySQL_Cloud
  16. Version control integration
  17. Appendix A: Metadata models
  18. Appendix B: ASO model
  19. Appendix C: Connector project migration
  20. Appendix D: Frequently used generic APIs in Informatica Connector Toolkit
  21. Appendix E: Frequently asked questions

Cloud Data Integration Connector Toolkit Developer Guide

Cloud Data Integration Connector Toolkit Developer Guide

Code changes for SQL transformation

Code changes for SQL transformation

When you create a endpoint metadata object for procedure pattern using an SQL transformation, you must manually modify the Informatica Connector Toolkit classes and methods to implement SQL transformation.
The changes are auto-generated if the code generation for runtime is executed for the first time or the first endpoint metadata object is being created. If the endpoint metadata object created is not the first one, perform the following steps:
  1. Add the following import statement in the RuntimeDataAdapter class:
    import com.informatica.sdk.adapter.metadata.common.typesystem.typelibrary.semantic.iface.StructuralFeature; import com.informatica.sdk.adapter.metadata.projection.sinkoperation.semantic.iface.PlatformSink; import com.informatica.sdk.adapter.metadata.patternblocks.procedure.semantic.iface.Procedure;
  2. Declare the variables in the RuntimeDataAdapter class as shown below:
    private String procName=null; private String sqlTxQuery = null; private List<StructuralFeature> sfList = null; private Procedure pr = null;
  3. Use the following code in the initDataSession method of RuntimeDataAdapter class to initialize the variables in step 2.
    List<Capability> caps1 = m_asoOperation1.getCapabilities(); Capability cap1 = caps1.get(0); if (cap1 instanceof CallCapability) { Projection readProj = m_asoOperation1.getReadProjection(); NativeSource nativeSrcOp = (NativeSource) (readProj.getBaseOperations(OperationTypeEnum.NATIVE_SOURCE) .get(0)); Procedure m_nativeProcedure = (Procedure) ((nativeSrcOp).getNativeRecord()); PlatformSink platformSink = (PlatformSink) (readProj.getBaseOperations(OperationTypeEnum.PLATFORM_SINK).get(0)); this.sfList = platformSink.getComplexType().getStructuralFeatures(); this.pr=m_nativeProcedure; this.procName = m_nativeProcedure.getName(); this.sqlTxQuery = pr.getQuery(); return EReturnStatus.SUCCESS; }
  4. Implement the call method in the RuntimeDataAdapter class.
    The following code snippet shows an example for MySQL_Cloud Connector:
    @Override public int call(DataSession dataSession, CallAttributes callAttr) throws SDKException { InfaUtils pInfaUtils = dataSession.getInfaUtilsHandle(); logger = pInfaUtils.getLogger(); MySQL_CloudTableDataConnection conn = (MySQL_CloudTableDataConnection) dataSession.getConnection(); Connection nativeConn = (Connection) conn.getNativeConnection(); int paramListSize = pr.getFieldList().size(); if(this.sqlTxQuery!=null && !this.sqlTxQuery.isEmpty()) { return executeAdhocSQLQuery(dataSession,callAttr,nativeConn); } return EReturnStatus.NO_MORE_DATA; }
  5. Use the following sample code for the method executeAdhocSQLQuery referenced in step 4:
    /** * The SQL Query provided by the user in SQL Tx is executed once for each input. * SQL Query provided by the user can be static or dynamic. Dynamic queries will contain parameters. * Allowed syntax for parameters are ~tablename~ for tablenames and ?columname? for columnnames. Users can provide a input port from the source as a parameter. Connector needs to replace the values for these parameters before query execution. * SQLError is always the first field in SQL Tx. SQLError field is null/empty if query is executed successfully. * If user selects RowsEffected checkbox in SQL Tx, then RowsEffected will be the second field in SQL Tx. Connector developer should populate this field for every row based on the number of rows effected by the query executed. * If SQL Tx is set as a Active transformation, then connector developer needs to return only one row for each query executed against the input row from the source. * MAX_OUT_ROW_COUNT indicates the maximum number of rows connector should return for each query. * If MAX_OUT_ROW_COUNT is 0, then there is no limit on maximum number of rows connector should return for each query. * Incase of error in query execution, connector developer should return one row with all null values and the error message in the SQLError field. * Connector should return NO_MORE_DATA if all the rows are read, else return SUCCESS **/ public int executeAdhocSQLQuery(DataSession dataSession, CallAttributes callAttr, Connection nativeConn){ int returnStatus; String query = this.sqlTxQuery; /** * Access the SQL Tx properties * String isActiveTransformation=this.infaUtils.getExecEnvStringProperty("IS ACTIVE"); * String maxOutRowCount=this.infaUtils.getExecEnvStringProperty("MAX_OUT_ROW_COUNT"); **/ int outputBufferCapacity=callAttr.getOutputBufCapacity(); //replace any parameters used in the query with values for(int i=0; i < this.inputParamList.size(); i++) { String paramDataType =this.inputParamList.get(i).getDataType(); String paramName = this.inputParamList.get(i).getName(); DataAttributes dataAttr = new DataAttributes(); dataAttr.setRowIndex(0); dataAttr.setColumnIndex(i); //adapter can read their respective types from data session String data; try { data = dataSession.getStringData(dataAttr); } catch (SDKException e) { // TODO Auto-generated catch block e.printStackTrace(); return EReturnStatus.FAILURE; } query=getResolvedAdhocQuery(query,paramName,data); } //Execute the query //rs = st.executeQuery(query); List<List<Object>> result = new ArrayList<List<Object>>(); returnStatus = readDatafromAdhocQuery(dataSession, result, outputBufferCapacity); try { setProcDataToPlatform(dataSession, result); } catch (SDKException e) { // TODO Auto-generated catch block e.printStackTrace(); return EReturnStatus.FAILURE; } return returnStatus; }
  6. Use the following sample code for the getResolvedAdhocQuery method referenced in step 5:
    /** * replaces the parameters in the query with data values from the input ports * @param query * @param paramName * @param data * @return */ public String getResolvedAdhocQuery(String query, String paramName, String data){ //Sample code to replace parameters in the sql query with values String param1="~"+ paramName+"~"; String param2="~ "+ paramName+ " ~"; String param3="?"+ paramName + "?"; String param4="? "+ paramName + " ?"; if(query.indexOf(param1) != -1) { query = query.replace(param1, data); } if(query.indexOf(param2) != -1) { query = query.replace(param2, data); } if(query.indexOf(param3) != -1) { data="'"+data+"'"; query = query.replace(param3, data); } if(query.indexOf(param4) != -1) { data="'"+data+"'"; query = query.replace(param4, data); } return query; }
  7. Use the following sample code for the readDatafromAdhocQuery method referenced in step 5:
    public int readDatafromAdhocQuery(DataSession dataSession, List<List<Object>> result, int outputBufferCapacity){ // Example code to fetch data from the jdbc result set /* int rowsRead = 0; try { while (rs.next() && rowsRead++ < outputBufferCapacity) { List<Object> datarow = new ArrayList<Object>(); for (int i = 0; i < connectedFields.size(); i++) { //adapter can read their respective datatypes instead of object datarow.add(rs.getObject(i+1)); } dataTable.add(datarow); } } catch (SQLException e) { logger.logMessage(EMessageLevel.MSG_FATAL_ERROR, ELogLevel.TRACE_NONE, e.getMessage()); return EReturnStatus.FAILURE; } if (rowsRead == outputBufferCapacity) return EReturnStatus.SUCCESS; */ // TODO: Replace the sample code with native code to read data from the native object // Sample dummy code to populate a projected source with one column of string type and two rows List<Object> datarow = new ArrayList<Object>(); datarow.add("Stan"); List<Object> datarow2 = new ArrayList<Object>(); datarow2.add("Mark"); result.add(datarow); result.add(datarow2); return EReturnStatus.NO_MORE_DATA; }
  8. Use the following sample code for the method setProcDataToPlatform referenced in step 5:
    /** * Sets the multiple row data in the data table to the data session buffer * * <pre> * ################################## * AUTOGENERATED CODE * ################################## * </pre> * * @param dataSession * The dataSession instance, which is the container for SDK * handles. * @param dataTable * List of List of Objects. Each List of Objects represents a * single row. */ public void setProcDataToPlatform(DataSession dataSession, List<List<Object>> dataTable) throws SDKException { for (int row = 0; row < dataTable.size(); row++) { List<Object> rowData = dataTable.get(row); for (int col = 0; col < dataTable.get(0).size(); col++) { DataAttributes pDataAttributes = new DataAttributes(); pDataAttributes.setDataSetId(0); pDataAttributes.setColumnIndex(col); pDataAttributes.setRowIndex(row); Object data = rowData.get(col); String dataType = sfList.get(col).getDataType(); String columnName = sfList.get(col).getName(); if (dataType.equalsIgnoreCase("string") || dataType.equalsIgnoreCase("text")) { if (data == null) { pDataAttributes.setLength(0); dataSession.setStringData((String) data, pDataAttributes); } else { String text = data.toString(); int fieldLength = sfList.get(col).getPrecision(); if (text.length() > fieldLength) { pDataAttributes.setLength(fieldLength); pDataAttributes.setIndicator(EIndicator.TRUNCATED); dataSession.setStringData(text.substring(0, fieldLength), pDataAttributes); } else { pDataAttributes.setLength(text.length()); pDataAttributes.setIndicator(EIndicator.VALID); } dataSession.setStringData(text, pDataAttributes); } } else if (dataType.compareToIgnoreCase("double") == 0) { if (data instanceof Double) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).doubleValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setDoubleData((Double) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("float") == 0) { if (data instanceof Float) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).floatValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setFloatData((Float) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("long") == 0) { if (data instanceof Long) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).longValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setLongData((Long) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("short") == 0) { if (data instanceof Short) pDataAttributes.setIndicator(EIndicator.VALID); else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).shortValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setShortData((Short) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("integer") == 0) { if (data instanceof Integer) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).intValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setIntData((Integer) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("bigint") == 0) { if (data instanceof Long) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).longValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setLongData((Long) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("date/time") == 0) { if (data instanceof Timestamp) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Date) { pDataAttributes.setIndicator(EIndicator.VALID); Timestamp ts = new Timestamp(((Date) data).getTime()); data = ts; } else if (data instanceof Time) { pDataAttributes.setIndicator(EIndicator.VALID); Timestamp ts = new Timestamp(((Time) data).getTime()); data = ts; } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "]" + " should be a of type [" + Timestamp.class.getName() + "]."); data = null; } dataSession.setDateTimeData((Timestamp) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("binary") == 0) { if (data instanceof byte[]) { pDataAttributes.setLength(((byte[]) data).length); pDataAttributes.setIndicator(EIndicator.VALID); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_DEBUG, ELogLevel.TRACE_VERBOSE_DATA, "Data for type [" + dataType + "] should be a of type [" + byte[].class.getName() + "]."); data = null; } dataSession.setBinaryData((byte[]) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("decimal") == 0) { if (data instanceof BigDecimal) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_DEBUG, ELogLevel.TRACE_VERBOSE_DATA, "Data for type [" + dataType + "] should be a of type [" + BigDecimal.class.getName() + "]."); data = null; } dataSession.setBigDecimalData((BigDecimal) data, pDataAttributes); } } } }

0 COMMENTS

We’d like to hear from you!