Data Integration Connector Toolkit
- Data Integration Connector Toolkit
- All Products
import com.informatica.sdk.adapter.metadata.common.typesystem.typelibrary.semantic.iface.StructuralFeature; import com.informatica.sdk.adapter.metadata.projection.sinkoperation.semantic.iface.PlatformSink; import com.informatica.sdk.adapter.metadata.patternblocks.procedure.semantic.iface.Procedure;
private String procName=null; private String sqlTxQuery = null; private List<StructuralFeature> sfList = null; private Procedure pr = null;
List<Capability> caps1 = m_asoOperation1.getCapabilities(); Capability cap1 = caps1.get(0); if (cap1 instanceof CallCapability) { Projection readProj = m_asoOperation1.getReadProjection(); NativeSource nativeSrcOp = (NativeSource) (readProj.getBaseOperations(OperationTypeEnum.NATIVE_SOURCE) .get(0)); Procedure m_nativeProcedure = (Procedure) ((nativeSrcOp).getNativeRecord()); PlatformSink platformSink = (PlatformSink) (readProj.getBaseOperations(OperationTypeEnum.PLATFORM_SINK).get(0)); this.sfList = platformSink.getComplexType().getStructuralFeatures(); this.pr=m_nativeProcedure; this.procName = m_nativeProcedure.getName(); this.sqlTxQuery = pr.getQuery(); return EReturnStatus.SUCCESS; }
@Override public int call(DataSession dataSession, CallAttributes callAttr) throws SDKException { InfaUtils pInfaUtils = dataSession.getInfaUtilsHandle(); logger = pInfaUtils.getLogger(); MySQL_CloudTableDataConnection conn = (MySQL_CloudTableDataConnection) dataSession.getConnection(); Connection nativeConn = (Connection) conn.getNativeConnection(); int paramListSize = pr.getFieldList().size(); if(this.sqlTxQuery!=null && !this.sqlTxQuery.isEmpty()) { return executeAdhocSQLQuery(dataSession,callAttr,nativeConn); } return EReturnStatus.NO_MORE_DATA; }
/** * The SQL Query provided by the user in SQL Tx is executed once for each input. * SQL Query provided by the user can be static or dynamic. Dynamic queries will contain parameters. * Allowed syntax for parameters are ~tablename~ for tablenames and ?columname? for columnnames. Users can provide a input port from the source as a parameter. Connector needs to replace the values for these parameters before query execution. * SQLError is always the first field in SQL Tx. SQLError field is null/empty if query is executed successfully. * If user selects RowsEffected checkbox in SQL Tx, then RowsEffected will be the second field in SQL Tx. Connector developer should populate this field for every row based on the number of rows effected by the query executed. * If SQL Tx is set as a Active transformation, then connector developer needs to return only one row for each query executed against the input row from the source. * MAX_OUT_ROW_COUNT indicates the maximum number of rows connector should return for each query. * If MAX_OUT_ROW_COUNT is 0, then there is no limit on maximum number of rows connector should return for each query. * Incase of error in query execution, connector developer should return one row with all null values and the error message in the SQLError field. * Connector should return NO_MORE_DATA if all the rows are read, else return SUCCESS **/ public int executeAdhocSQLQuery(DataSession dataSession, CallAttributes callAttr, Connection nativeConn){ int returnStatus; String query = this.sqlTxQuery; /** * Access the SQL Tx properties * String isActiveTransformation=this.infaUtils.getExecEnvStringProperty("IS ACTIVE"); * String maxOutRowCount=this.infaUtils.getExecEnvStringProperty("MAX_OUT_ROW_COUNT"); **/ int outputBufferCapacity=callAttr.getOutputBufCapacity(); //replace any parameters used in the query with values for(int i=0; i < this.inputParamList.size(); i++) { String paramDataType =this.inputParamList.get(i).getDataType(); String paramName = this.inputParamList.get(i).getName(); DataAttributes dataAttr = new DataAttributes(); dataAttr.setRowIndex(0); dataAttr.setColumnIndex(i); //adapter can read their respective types from data session String data; try { data = dataSession.getStringData(dataAttr); } catch (SDKException e) { // TODO Auto-generated catch block e.printStackTrace(); return EReturnStatus.FAILURE; } query=getResolvedAdhocQuery(query,paramName,data); } //Execute the query //rs = st.executeQuery(query); List<List<Object>> result = new ArrayList<List<Object>>(); returnStatus = readDatafromAdhocQuery(dataSession, result, outputBufferCapacity); try { setProcDataToPlatform(dataSession, result); } catch (SDKException e) { // TODO Auto-generated catch block e.printStackTrace(); return EReturnStatus.FAILURE; } return returnStatus; }
/** * replaces the parameters in the query with data values from the input ports * @param query * @param paramName * @param data * @return */ public String getResolvedAdhocQuery(String query, String paramName, String data){ //Sample code to replace parameters in the sql query with values String param1="~"+ paramName+"~"; String param2="~ "+ paramName+ " ~"; String param3="?"+ paramName + "?"; String param4="? "+ paramName + " ?"; if(query.indexOf(param1) != -1) { query = query.replace(param1, data); } if(query.indexOf(param2) != -1) { query = query.replace(param2, data); } if(query.indexOf(param3) != -1) { data="'"+data+"'"; query = query.replace(param3, data); } if(query.indexOf(param4) != -1) { data="'"+data+"'"; query = query.replace(param4, data); } return query; }
public int readDatafromAdhocQuery(DataSession dataSession, List<List<Object>> result, int outputBufferCapacity){ // Example code to fetch data from the jdbc result set /* int rowsRead = 0; try { while (rs.next() && rowsRead++ < outputBufferCapacity) { List<Object> datarow = new ArrayList<Object>(); for (int i = 0; i < connectedFields.size(); i++) { //adapter can read their respective datatypes instead of object datarow.add(rs.getObject(i+1)); } dataTable.add(datarow); } } catch (SQLException e) { logger.logMessage(EMessageLevel.MSG_FATAL_ERROR, ELogLevel.TRACE_NONE, e.getMessage()); return EReturnStatus.FAILURE; } if (rowsRead == outputBufferCapacity) return EReturnStatus.SUCCESS; */ // TODO: Replace the sample code with native code to read data from the native object // Sample dummy code to populate a projected source with one column of string type and two rows List<Object> datarow = new ArrayList<Object>(); datarow.add("Stan"); List<Object> datarow2 = new ArrayList<Object>(); datarow2.add("Mark"); result.add(datarow); result.add(datarow2); return EReturnStatus.NO_MORE_DATA; }
/** * Sets the multiple row data in the data table to the data session buffer * * <pre> * ################################## * AUTOGENERATED CODE * ################################## * </pre> * * @param dataSession * The dataSession instance, which is the container for SDK * handles. * @param dataTable * List of List of Objects. Each List of Objects represents a * single row. */ public void setProcDataToPlatform(DataSession dataSession, List<List<Object>> dataTable) throws SDKException { for (int row = 0; row < dataTable.size(); row++) { List<Object> rowData = dataTable.get(row); for (int col = 0; col < dataTable.get(0).size(); col++) { DataAttributes pDataAttributes = new DataAttributes(); pDataAttributes.setDataSetId(0); pDataAttributes.setColumnIndex(col); pDataAttributes.setRowIndex(row); Object data = rowData.get(col); String dataType = sfList.get(col).getDataType(); String columnName = sfList.get(col).getName(); if (dataType.equalsIgnoreCase("string") || dataType.equalsIgnoreCase("text")) { if (data == null) { pDataAttributes.setLength(0); dataSession.setStringData((String) data, pDataAttributes); } else { String text = data.toString(); int fieldLength = sfList.get(col).getPrecision(); if (text.length() > fieldLength) { pDataAttributes.setLength(fieldLength); pDataAttributes.setIndicator(EIndicator.TRUNCATED); dataSession.setStringData(text.substring(0, fieldLength), pDataAttributes); } else { pDataAttributes.setLength(text.length()); pDataAttributes.setIndicator(EIndicator.VALID); } dataSession.setStringData(text, pDataAttributes); } } else if (dataType.compareToIgnoreCase("double") == 0) { if (data instanceof Double) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).doubleValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setDoubleData((Double) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("float") == 0) { if (data instanceof Float) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).floatValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setFloatData((Float) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("long") == 0) { if (data instanceof Long) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).longValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setLongData((Long) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("short") == 0) { if (data instanceof Short) pDataAttributes.setIndicator(EIndicator.VALID); else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).shortValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setShortData((Short) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("integer") == 0) { if (data instanceof Integer) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).intValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setIntData((Integer) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("bigint") == 0) { if (data instanceof Long) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).longValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setLongData((Long) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("date/time") == 0) { if (data instanceof Timestamp) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Date) { pDataAttributes.setIndicator(EIndicator.VALID); Timestamp ts = new Timestamp(((Date) data).getTime()); data = ts; } else if (data instanceof Time) { pDataAttributes.setIndicator(EIndicator.VALID); Timestamp ts = new Timestamp(((Time) data).getTime()); data = ts; } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "]" + " should be a of type [" + Timestamp.class.getName() + "]."); data = null; } dataSession.setDateTimeData((Timestamp) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("binary") == 0) { if (data instanceof byte[]) { pDataAttributes.setLength(((byte[]) data).length); pDataAttributes.setIndicator(EIndicator.VALID); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_DEBUG, ELogLevel.TRACE_VERBOSE_DATA, "Data for type [" + dataType + "] should be a of type [" + byte[].class.getName() + "]."); data = null; } dataSession.setBinaryData((byte[]) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("decimal") == 0) { if (data instanceof BigDecimal) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_DEBUG, ELogLevel.TRACE_VERBOSE_DATA, "Data for type [" + dataType + "] should be a of type [" + BigDecimal.class.getName() + "]."); data = null; } dataSession.setBigDecimalData((BigDecimal) data, pDataAttributes); } } } }