Getting Started with Python
Using the PLC4PY API directly
Currently, you need to install PLC4Py from the GitHub repository instead of pypi. Once we have decided that PLC4Py is in a position to release we will publish to pypi.
Navigate to the plc4py directory and run. This will install plc4py in your global repository.
pip install .
You now should be able to use PLC4Py in your application. A minimal example is shown below.
import asyncio from plc4py.PlcDriverManager import PlcDriverManager connection_string = "modbus://127.0.0.1:5020" driver_manager = PlcDriverManager() async def communicate_with_plc(): """ Asynchronously communicates with a PLC using a PlcDriverManager. This function establishes a connection to the PLC defined by the connection_string. It builds a read request for a specific item ("Random Tag" in this case) using the connection's read request builder. The request is then executed asynchronously, and the response code is printed. """ print(f"Connecting to plc: {connection_string}") async with driver_manager.connection(connection_string) as connection: print(f"Connected to {connection_string}") with connection.read_request_builder() as builder: print(f"Building read request") builder.add_item("Random Tag", "4x00001[10]") request = builder.build() print(f"Request built") print(f"Executing request") response = await connection.execute(request) print(f"Request executed") print(f"Response code: {response.response_code}") asyncio.run(communicate_with_plc())
PLC4X generally supports a very limited set of functions, which is not due to the fact, that we didn’t implement things, but that PLCs generally support a very limited set of functions.
The basic functions supported by PLCs and therefore supported by PLC4X are:
-
Discover Devices (Not yet available for PLC4Py)
-
List resources in the PLC
-
Read data
-
Write data
-
Subscribe for data (Not yet available for PLC4Py)
In general, we will try to offer as many features as possible. So if a protocol doesn’t support subscription based communication it is our goal to simulate this by polling in the background, so it is transparent for the users (This simulation feature hasn’t been implemented yet though, but it’s on our roadmap).
But there are some cases in which we can’t simulate or features are simply disabled intentionally:
-
If a PLC and/or protocol don’t support writing or browsing, we simply can’t provide this functionality.
Therefore, we use metadata to check programmatically, if a given feature is available.
Reading Data
# Check if this connection support reading of data. if not connection.is_read_supported(): logger.error("This connection doesn't support reading.")
As soon as you have ensured that a feature is available, you are ready to build a first request.
This is done by getting a ReadRequestBuilder
:
# Create a new read request: # - Give the single item requested an alias name with connection.read_request_builder() as builder: builder.add_item("Random Tag 1", "4x00001[10]") builder.add_item("Random Tag 2", "4x00011") request = builder.build()
So, as you can see, you prepare a request, by adding tag addresses to the request and in the end by calling the build
method.
If you are using the BrowseApi
you might also have been provided with Tag
objects. In that case simply use add_tag
and
pass in the PlcTag
object instead of the address string.
The request is sent to the PLC by issuing the execute
method on the request object:
try: response = await connection.execute(request) except TimeOutException: # Handle timeout error except ... # Handle all your other errors
In general, all requests are executed asynchronously.
As soon as the request is fully processed, the callback gets called and will contain a ReadResponse
, if everything went right or
an excception if there were problems.
The following example will demonstrate some of the options you have:
for tag_name in response.tag_names: if response.tags[tag_name].response_code == PlcResponseCode.OK: num_values: int = len(response.tags[tag_name].value) # If it's just one element, output just one single line. if num_values == 1: logger.info("Value[" + tag_name + "]: " + response.tags[tag_name].value) else: # If it's more than one element, output each in a single row. logger.info("Value[" + tag_name + "]:") for i in response.tags[tag_name].value.get_list(): logger.info(" - " + str(i)) else: # Something went wrong, to output an error message instead. logger.error("Error[" + tag_name + "]: " + response.tags[tag_name].name())
In the for-loop, we are demonstrating how the user can iterate over the tag aliases in the response. In case of an ordinary read request, this will be predefined by the items in the request, however in case of a subscription response, the response might only contain some of the items that were subscribed.
Before accessing the data, it is advisable to check if an item was correctly returned.
This is done by the response_code
property for a given alias.
If this is PlcResponseCode.OK
, everything is ok, however it could be one of the following:
-
NOT_FOUND
-
ACCESS_DENIED
-
INVALID_ADDRESS
-
INVALID_DATATYPE
-
INTERNAL_ERROR
-
RESPONSE_PENDING
Assuming the return code was OK
, we can continue accessing the data.
As all PlcValue items support the len property, the user can check how many items of a given type are returned by calling len(response.tags[tag_name].value)
You can then treat the values in the PlcList as a list using response.tags[tag_name].value.get_list()
Writing Data
In general the structure of code for writing data is extremely similar to that of reading data.
So first it is advisable to check if this connection is even able to write data:
// Check if this connection support writing of data. if not plc_connection.is_write_supported(): logger.error("This connection doesn't support writing.")
As soon as we are sure that we can write, we create a new PlcWriteRequest.Builder
:
// Create a new write request: // - Give the single item requested an alias name // - Pass in the data you want to write (for arrays, pass in a list of values) with connection.write_request_builder() as builder: builder.add_item("Random Tag 1", "4x00001[2]", PlcList([PlcINT(1), PlcINT(2)])) builder.add_item("Random Tag 2", "4x00011", PlcINT(1)) request = builder.build()
The same way read requests are sent to the PLC by issuing the execute
method on the request object:
try: response = await connection.execute(request) except TimeOutException: # Handle timeout error except ... # Handle all your other errors
As we don’t have to process the data itself, for the write request, it’s enough to simply check the return code for each field.
for tag_name in response.tag_names: if response.tags[tag_name].response_code == PlcResponseCode.OK: logger.info("Value[" + tag_name + "]: updated"); else: # Something went wrong, to output an error message instead. logger.error("Error[" + tag_name + "]: " + response.tags[tag_name].name())