1 /// 2 module dpq.connection; 3 4 //import derelict.pq.pq; 5 import libpq.libpq; 6 7 import dpq.exception; 8 import dpq.result; 9 import dpq.value; 10 import dpq.attributes; 11 import dpq.querybuilder; 12 import dpq.meta; 13 import dpq.prepared; 14 import dpq.smartptr; 15 import dpq.serialisation; 16 17 import dpq.serialisers.array; 18 import dpq.serialisers.composite; 19 20 21 import std..string; 22 import libpq.libpq; 23 import std.conv : to; 24 import std.traits; 25 import std.typecons; 26 27 28 version(unittest) 29 { 30 import std.stdio; 31 Connection c; 32 } 33 34 /** 35 Represents the PostgreSQL connection and allows executing queries on it. 36 37 Examples: 38 ------------- 39 auto conn = Connection("host=localhost dbname=testdb user=testuser"); 40 //conn.exec ... 41 ------------- 42 */ 43 struct Connection 44 { 45 private alias ConnectionPtr = SmartPointer!(PGconn*, PQfinish); 46 47 private ConnectionPtr _connection; 48 private PreparedStatement[string] _prepared; 49 50 /** 51 Connection constructor 52 53 Params: 54 connString = connection string 55 56 See Also: 57 http://www.postgresql.org/docs/9.3/static/libpq-connect.html#LIBPQ-CONNSTRING 58 */ 59 this(string connString) 60 { 61 char* err; 62 auto opts = PQconninfoParse(cast(char*)connString.toStringz, &err); 63 64 if (err != null) 65 throw new DPQException(err.fromStringz.to!string); 66 67 _connection = new ConnectionPtr(PQconnectdb(connString.toStringz)); 68 69 if (status != CONNECTION_OK) 70 throw new DPQException(errorMessage); 71 72 _dpqLastConnection = &this; 73 } 74 75 unittest 76 { 77 c = Connection("host=127.0.0.1 dbname=test user=test"); 78 writeln(" * Database connection with connection string"); 79 assert(c.status == CONNECTION_OK); 80 } 81 82 /** 83 Close the connection manually 84 */ 85 void close() 86 { 87 _connection.clear(); 88 } 89 90 @property const(ConnStatusType) status() 91 { 92 return PQstatus(_connection); 93 } 94 95 /** Returns the name of the database currently selected */ 96 @property const(string) db() 97 { 98 return PQdb(_connection).to!string; 99 } 100 101 /** Returns the name of the current user */ 102 @property const(string) user() 103 { 104 return PQuser(_connection).to!string; 105 } 106 107 /// ditto, but password 108 @property const(string) password() 109 { 110 return PQpass(_connection).to!string; 111 } 112 113 /// ditto, but host 114 @property const(string) host() 115 { 116 return PQhost(_connection).to!string; 117 } 118 119 /// ditto, but port 120 @property const(ushort) port() 121 { 122 return PQport(_connection).fromStringz.to!ushort; 123 } 124 125 /** 126 Executes the given string directly 127 128 Throws on fatal query errors like bad syntax. 129 WARNING: Only returns textual values!!! 130 131 Examples: 132 ---------------- 133 Connection conn; // An established connection 134 135 conn.exec("CREATE TABLE IF NOT EXISTS test_table"); 136 ---------------- 137 */ 138 Result exec(string command) 139 { 140 PGresult* res = PQexec(_connection, cast(const char*)command.toStringz); 141 return Result(res); 142 } 143 144 unittest 145 { 146 auto res = c.exec("SELECT 1::INT4 AS int4, 2::INT8 AS some_long"); 147 writeln(" * exec for selecting INT4 and INT8"); 148 assert(res.rows == 1); 149 assert(res.columns == 2); 150 151 auto r = res[0]; 152 assert(r[0].as!string == "1"); 153 assert(r[1].as!string == "2"); 154 155 writeln(" * Row opIndex(int) and opIndex(string) equality "); 156 assert(r[0] == r["int4"]); 157 assert(r[1] == r["some_long"]); 158 159 } 160 161 /// ditto, async 162 bool send(string command) 163 { 164 return PQsendQuery(_connection, cast(const char*)command.toStringz) == 1; 165 } 166 167 /** 168 Executes the given string with given params 169 170 Params should be given as $1, $2, ... $n in the actual command. 171 All params are sent in a binary format and should not be escaped. 172 If a param's type cannot be inferred, this method will throw an exception, 173 in this case, either specify the type using the :: (cast) notation or 174 make sure the type can be inferred by PostgreSQL in your query. 175 176 Examples: 177 ---------------- 178 Connection conn; // An established connection 179 180 conn.execParams("SELECT $1::string, $2::int, $3::double"); 181 ---------------- 182 183 See also: 184 http://www.postgresql.org/docs/9.3/static/libpq-exec.html 185 */ 186 Result execParams(T...)(string command, T params) 187 { 188 Value[] values; 189 foreach(param; params) 190 values ~= Value(param); 191 192 return execParams(command, values); 193 } 194 195 196 /// ditty, but async 197 void sendParams(T...)(string command, T params) 198 { 199 Value[] values; 200 foreach(param; params) 201 values ~= Value(param); 202 203 execParams(command, values, true); 204 } 205 206 /// ditto, but taking an array of params, instead of variadic template 207 Result execParams(string command, Value[] params, bool async = false) 208 { 209 const char* cStr = cast(const char*) command.toStringz; 210 211 auto pTypes = params.paramTypes; 212 auto pValues = params.paramValues; 213 auto pLengths = params.paramLengths; 214 auto pFormats = params.paramFormats; 215 216 if (async) 217 { 218 PQsendQueryParams( 219 _connection, 220 cStr, 221 params.length.to!int, 222 pTypes.ptr, 223 cast(const(char*)*)pValues.ptr, 224 pLengths.ptr, 225 pFormats.ptr, 226 1); 227 228 return Result(null); 229 } 230 else 231 return Result(PQexecParams( 232 _connection, 233 cStr, 234 params.length.to!int, 235 pTypes.ptr, 236 cast(const(char*)*)pValues.ptr, 237 pLengths.ptr, 238 pFormats.ptr, 239 1)); 240 } 241 242 /// ditto, async 243 void sendParams(string command, Value[] params) 244 { 245 execParams(command, params, true); 246 } 247 248 unittest 249 { 250 writeln("\t * execParams"); 251 writeln("\t\t * Rows and cols"); 252 253 // we're not testing value here, specify types in the query to avoid any oid issues 254 auto res = c.execParams("SELECT 1::INT4 AS int4, 2::INT8 AS some_long", []); 255 assert(res.rows == 1); 256 assert(res.columns == 2); 257 258 writeln("\t\t * Static values"); 259 auto r = res[0]; 260 assert(r[0].as!int == 1); 261 assert(r[1].as!long == 2); 262 263 writeln("\t\t * opIndex(int) and opIndex(string) equality"); 264 assert(r[0] == r["int4"]); 265 assert(r[1] == r["some_long"]); 266 267 int int4 = 1; 268 long int8 = 2; 269 string str = "foo bar baz"; 270 float float4 = 3.14; 271 double float8 = 3.1415; 272 273 writeln("\t\t * Passed values"); 274 res = c.execParams( 275 "SELECT $1::INT4, $2::INT8, $3::TEXT, $4::FLOAT4, $5::FLOAT8", 276 int4, 277 int8, 278 str, 279 float4, 280 float8); 281 282 assert(res.rows == 1); 283 r = res[0]; 284 285 // This should probably be tested by the serialisers, not here. 286 assert(r[0].as!int == int4); 287 assert(r[1].as!long == int8); 288 assert(r[2].as!string == str); 289 assert(r[3].as!float == float4); 290 assert(r[4].as!double == float8); 291 } 292 293 /** 294 Returns the last error message 295 296 Examples: 297 -------------------- 298 Connection conn; // An established connection 299 300 writeln(conn.errorMessage); 301 -------------------- 302 */ 303 @property string errorMessage() 304 { 305 return PQerrorMessage(_connection).to!string; 306 } 307 308 unittest 309 { 310 writeln("\t * errorMessage"); 311 try 312 { 313 c.execParams("SELECT_BADSYNTAX $1::INT4", 1); 314 } 315 catch {} 316 317 assert(c.errorMessage.length != 0); 318 } 319 320 /** 321 Escapes a string, to be used in a query 322 */ 323 string escapeLiteral(string str) 324 { 325 const(char)* cStr = str.toStringz; 326 auto esc = PQescapeLiteral(_connection, cStr, str.length); 327 328 if (esc == null) 329 throw new DPQException("escapeLiteral failed: " ~ this.errorMessage); 330 331 str = esc.fromStringz.dup; // make a copy, escaped data must be freed 332 PQfreemem(esc); 333 return str; 334 } 335 336 /** 337 Escapes an identifier (column, function, table name, ...) to be used in a query. 338 */ 339 string escapeIdentifier(string str) 340 { 341 const(char)* cStr = str.toStringz; 342 auto esc = PQescapeIdentifier(_connection, cStr, str.length); 343 344 if (esc == null) 345 throw new DPQException("escapeIdentifier failed: " ~ this.errorMessage); 346 347 str = esc.fromStringz.dup; // make a copy, escaped data must be freed 348 PQfreemem(esc); 349 return str; 350 } 351 352 353 /** 354 Will create the relation and return the queries (foreign key and index creation) 355 that still need to be ran, usually after ALL the relations are created. 356 */ 357 private string[] createRelation(T)() 358 { 359 alias members = serialisableMembers!T; 360 361 string relName = SerialiserFor!T.nameForType!T; 362 string escRelName = escapeIdentifier(relName); 363 364 // A list of columns in the table 365 string[] columns; 366 // Queries that must be ran after the table is created 367 string[] additionalQueries; 368 369 foreach (mName; members) 370 { 371 // Is there a better way to do this? 372 enum member = "T." ~ mName; 373 374 // If the member is a property, typeof will fail with the wront context error 375 // so we construct it and then check the type 376 static if (is(FunctionTypeOf!(mixin(member)) == function)) 377 alias MType = RealType!(typeof(mixin("T()." ~ mName))); 378 else 379 alias MType = RealType!(typeof(mixin(member))); 380 381 alias serialiser = SerialiserFor!MType; 382 383 // The attribute's name 384 string attrName = attributeName!(mixin(member)); 385 string escAttrName = escapeIdentifier(attrName); 386 387 // And the type, the user-specified type overwrites anything else 388 static if (hasUDA!(mixin(member), PGTypeAttribute)) 389 string attrType = getUDAs!(mixin(member), PGTypeAttribute)[0].type; 390 else 391 string attrType = serialiser.nameForType!MType; 392 393 string attr = escAttrName ~ " " ~ attrType; 394 395 // A type must be created before using it. 396 serialiser.ensureExistence!MType(this); 397 398 // Take care of primary keys 399 static if (hasUDA!(mixin(member), PrimaryKeyAttribute)) 400 attr ~= " PRIMARY KEY"; 401 // And foreign key 402 else static if (hasUDA!(mixin(member), ForeignKeyAttribute)) 403 { 404 enum uda = getUDAs!(mixin(member), ForeignKeyAttribute)[0]; 405 406 // Create the FK 407 additionalQueries ~= 408 `ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY(%s) REFERENCES %s (%s)`.format( 409 escRelName, 410 escapeIdentifier("%s_%s_fk_%s".format(relName, attrName, uda.relation)), 411 escAttrName, 412 escapeIdentifier(uda.relation), 413 escapeIdentifier(uda.pkey)); 414 415 // Also create an index on the foreign key 416 additionalQueries ~= "CREATE INDEX %s ON %s (%s)".format( 417 escapeIdentifier("%s_%s_fk_index".format(relName, attrName)), 418 escRelName, 419 escAttrName); 420 } 421 else static if (hasUDA!(mixin(member), IndexAttribute)) 422 { 423 enum uda = getUDAs!(mixin(member), IndexAttribute)[0]; 424 425 additionalQueries ~= "CREATE%sINDEX %s ON %s (%s)".format( 426 uda.unique ? " UNIQUE " : " ", 427 escapeIdentifier("%s_%s_fk_index".format(relName, attrName)), 428 escRelName, 429 escAttrName); 430 } 431 432 // Custom suffixes 433 static if (hasUDA!(mixin(member), ColumnSuffixAttribute)) 434 { 435 enum suffix = getUDAs!(mixin(member), ColumnSuffixAttribute)[0].suffix; 436 attr ~= " " ~ suffix; 437 } 438 439 columns ~= attr; 440 } 441 442 // Create the table 443 exec("CREATE TABLE IF NOT EXISTS %s (%s)".format(escRelName, columns.join(", "))); 444 445 addOidsFor(relName); 446 447 return additionalQueries; 448 } 449 450 package void addOidsFor(string typeName) 451 { 452 auto r = execParams("SELECT $1::regtype::oid, $2::regtype::oid", typeName, typeName ~ "[]"); 453 Oid typeOid = r[0][0].as!int; 454 Oid arrOid = r[0][1].as!int; 455 456 CompositeTypeSerialiser.addCustomOid(typeName, typeOid); 457 ArraySerialiser.addCustomOid(typeOid, arrOid); 458 } 459 460 461 /** 462 Generates and runs the DDL from the given structures 463 464 Attributes from dpq.attributes should be used to define 465 primary keys, indexes, and relationships. 466 467 A custom type can be specified with the @type attribute. 468 469 Examples: 470 ----------------------- 471 Connection conn; // An established connection 472 struct User 473 { 474 @serial8 @PKey long id; 475 string username; 476 byte[] passwordHash; 477 }; 478 479 struct Article { ... }; 480 481 conn.ensureSchema!(User, Article); 482 ----------------------- 483 */ 484 void ensureSchema(T...)(bool createType = false) 485 { 486 import std.stdio; 487 string[] additional; 488 489 foreach (Type; T) 490 additional ~= createRelation!Type; 491 492 foreach (cmd; additional) 493 try { exec(cmd); } catch {} // Horrible, I know, but this just means the constraint/index already exists 494 } 495 496 unittest 497 { 498 // Probably needs more thorough testing, let's assume right now 499 // everything is correct if the creating was successful. 500 501 writeln("\t * ensureSchema"); 502 struct Inner 503 { 504 string innerStr; 505 int innerInt; 506 } 507 508 struct TestTable1 509 { 510 @serial8 @PK long id; 511 string str; 512 int n; 513 Inner inner; 514 } 515 516 c.ensureSchema!TestTable1; 517 518 auto res = c.execParams( 519 "SELECT COUNT(*) FROM pg_catalog.pg_tables WHERE tablename = $1", 520 relationName!TestTable1); 521 522 assert(res.rows == 1); 523 assert(res[0][0].as!long == 1); 524 525 c.exec("DROP TABLE " ~ relationName!TestTable1); 526 c.exec("DROP TYPE \"" ~ relationName!Inner ~ "\" CASCADE"); 527 } 528 529 /** 530 Returns the requested structure or a Nullable null value if no rows are returned 531 532 This method queries for the given structure by its primary key. If no 533 primary key can be found, a compile-time error will be generated. 534 535 Examples: 536 ---------------------- 537 Connection conn; // An established connection 538 struct User 539 { 540 @serial @PKey int id; 541 ... 542 }; 543 544 auto user = conn.findOne!User(1); // will search by the id attribute 545 ---------------------- 546 */ 547 Nullable!T findOne(T, U)(U id) 548 { 549 return findOneBy!T(primaryKeyAttributeName!T, id); 550 } 551 552 unittest 553 { 554 writeln("\t * findOne(T)(U id), findOneBy, findOne"); 555 struct Testy 556 { 557 @serial @PK int id; 558 string foo; 559 int bar; 560 long baz; 561 int[] intArr; 562 } 563 564 c.ensureSchema!Testy; 565 566 writeln("\t\t * Null result"); 567 auto shouldBeNull = c.findOne!Testy(0); 568 assert(shouldBeNull.isNull); 569 570 c.exec( 571 "INSERT INTO %s (id, foo, bar, baz, %s) VALUES (1, 'somestr', 2, 3, '{1,2,3}')".format( 572 relationName!Testy, attributeName!(Testy.intArr))); 573 574 writeln("\t\t * Valid result"); 575 Testy t = c.findOne!Testy(1); 576 assert(t.id == 1, `t.id == 1` ); 577 assert(t.foo == "somestr", `t.foo == "somestr"`); 578 assert(t.bar == 2, `t.bar == 2`); 579 assert(t.baz == 3, `t.baz == 3`); 580 assert(t.intArr == [1,2,3], `t.intArr == [1,2,3]`); 581 582 writeln("\t\t * findOne with custom filter"); 583 Testy t2 = c.findOne!Testy("id = $1", 1); 584 assert(t == t2); 585 586 c.exec("DROP TABLE " ~ relationName!Testy); 587 } 588 589 /** 590 Returns the requested structure, searches by the given column name 591 with the given value 592 If not rows are returned, a Nullable null value is returned 593 594 Examples: 595 ---------------------- 596 Connection conn; // An established connection 597 struct User 598 { 599 @serial @PKey int id; 600 ... 601 }; 602 603 auto user = conn.findOneBy!User("id", 1); // will search by "id" 604 ---------------------- 605 */ 606 Nullable!T findOneBy(T, U)(string col, U val) 607 { 608 import std.stdio; 609 610 auto members = AttributeList!T; 611 612 QueryBuilder qb; 613 qb.select(members) 614 .from(relationName!T) 615 .where([col: val]) 616 .limit(1); 617 618 auto q = qb.query(this); 619 620 auto r = q.run(); 621 if (r.rows == 0) 622 return Nullable!T.init; 623 624 //return T(); 625 626 auto res = deserialise!T(r[0]); 627 return Nullable!T(res); 628 } 629 630 /** 631 Returns the requested structure, searches by the specified filter 632 with given params 633 634 The filter is not further escaped, so programmer needs to make sure 635 not to properly escape or enclose reserved keywords (like user -> "user") 636 so PostgreSQL can understand them. 637 638 If not rows are returned, a Nullable null value is returned 639 640 Examples: 641 ---------------------- 642 Connection conn; // An established connection 643 struct User 644 { 645 @serial @PKey int id; 646 string username; 647 int posts; 648 }; 649 650 auto user = conn.findOne!User("username = $1 OR posts > $2", "foo", 42); 651 if (!user.isNull) 652 { 653 ... // do something 654 } 655 ---------------------- 656 */ 657 Nullable!T findOne(T, U...)(string filter, U vals) 658 { 659 QueryBuilder qb; 660 qb.select(AttributeList!T) 661 .from(relationName!T) 662 .where(filter) 663 .limit(1); 664 665 auto q = qb.query(this); 666 auto r = q.run(vals); 667 668 if (r.rows == 0) 669 return Nullable!T.init; 670 671 auto res = deserialise!T(r[0]); 672 return Nullable!T(res); 673 } 674 675 /** 676 Returns an array of the specified type, filtered with the given filter and 677 params 678 679 If no rows are returned by PostgreSQL, an empty array is returned. 680 681 Examples: 682 ---------------------- 683 Connection conn; // An established connection 684 struct User 685 { 686 @serial @PKey int id; 687 string username; 688 int posts; 689 }; 690 691 auto users = conn.find!User("username = $1 OR posts > $2", "foo", 42); 692 foreach (u; users) 693 { 694 ... // do something 695 } 696 ---------------------- 697 */ 698 T[] find(T, U...)(string filter = "", U vals = U.init) 699 { 700 QueryBuilder qb; 701 qb.select(AttributeList!T) 702 .from(relationName!T) 703 .where(filter); 704 705 auto q = qb.query(this); 706 707 T[] res; 708 foreach (r; q.run(vals)) 709 res ~= deserialise!T(r); 710 711 return res; 712 } 713 714 /** 715 Returns an array of the specified type, filtered with the given filter and 716 params with length limited by limit 717 718 If no rows are returned by PostgreSQL, an empty array is returned. 719 If limit is set to -1 then all filtered rows are returned. 720 721 Examples: 722 ---------------------- 723 Connection conn; // An established connection 724 struct User 725 { 726 @serial @PKey int id; 727 string username; 728 int posts; 729 }; 730 731 auto users = conn.findL!User("username = $1 OR posts > $2", 5, "foo", 42); 732 foreach (u; users) 733 { 734 ... // do something 735 } 736 ---------------------- 737 */ 738 T[] findL(T, U...)(string filter, int limit, U vals) 739 { 740 QueryBuilder qb; 741 qb.select(AttributeList!T) 742 .from(relationName!T) 743 .where(filter) 744 .limit(limit); 745 746 auto q = qb.query(this); 747 748 T[] res; 749 foreach (r; q.run(vals)) 750 res ~= deserialise!T(r); 751 752 return res; 753 } 754 755 unittest 756 { 757 writeln("\t * find, findL"); 758 759 @relation("find_test") 760 struct Test 761 { 762 @serial @PK int id; 763 @attr("my_n") int n; 764 } 765 766 c.ensureSchema!Test; 767 768 Test t; 769 t.n = 1; 770 771 c.insert(t); 772 c.insert(t); 773 ++t.n; 774 c.insert(t); 775 c.insert(t); 776 c.insert(t); 777 778 Test[] ts = c.find!Test("my_n = $1", 1); 779 assert(ts.length == 2); 780 ts = c.find!Test("my_n > 0"); 781 assert(ts.length == 5); 782 ts = c.find!Test("false"); 783 assert(ts.length == 0); 784 ts = c.findL!Test("0 = 0", -1); 785 assert(ts.length == 5); 786 ts = c.findL!Test("my_n = $1", -1, 2); 787 assert(ts.length == 3); 788 ts = c.findL!Test("my_n = $1", 2, 2); 789 assert(ts.length == 2); 790 791 c.exec("DROP TABLE find_test"); 792 } 793 794 /** 795 Updates records filtered by the filter string, setting the values 796 as specified in the update string. Both should be SQL-syntax 797 798 Useful when updating just a single or a bunch of values in the table, or 799 when setting the values relatively to their current value. 800 801 Params: 802 filter = the SQL filter string 803 update = the SQL update string 804 vals = values to be used in the query 805 806 Examples: 807 ---------------- 808 Connection c; // an established connection 809 struct User { int id; int posts ...} 810 c.update!User("id = $1", "posts = posts + $2", 123, 1); 811 ---------------- 812 */ 813 int update(T, U...)(string filter, string update, U vals) 814 { 815 QueryBuilder qb; 816 qb.update(relationName!T) 817 .set(update) 818 .where(filter); 819 820 auto r = qb.query(this).run(vals); 821 return r.rows; 822 } 823 824 unittest 825 { 826 writeln("\t * update"); 827 828 @relation("update_test") 829 struct Test 830 { 831 @serial @PK int id; 832 int n; 833 } 834 835 c.ensureSchema!Test; 836 837 Test t; 838 t.n = 5; 839 c.insert(t); 840 841 int nUpdates = c.update!Test("n = $1", "n = $2", 5, 123); 842 assert(nUpdates == 1, `nUpdates == 1`); 843 844 t = c.findOneBy!Test("n", 123); 845 assert(t.n == 123, `t.n == 123`); 846 847 writeln("\t\t * async"); 848 c.updateAsync!Test("n = $1", "n = $2", 123, 6); 849 auto r = c.nextResult(); 850 851 assert(r.rows == 1); 852 assert(!c.findOneBy!Test("n", 6).isNull); 853 854 c.exec("DROP TABLE update_test"); 855 } 856 857 /// ditto, async 858 void updateAsync(T, U...)(string filter, string update, U vals) 859 { 860 QueryBuilder qb; 861 qb.update(relationName!T) 862 .set(update) 863 .where(filter); 864 865 qb.query(this).runAsync(vals); 866 } 867 868 /** 869 Similar to above update, but instead of acceptign a filter and and update string, 870 always filters by the PK and updates with absolute values from the updates AA. 871 872 Params: 873 id = the value of the relation's PK to filter by 874 updates = an AA, mapping column name to the new value to be set for the column 875 async = optionally send this query async 876 877 Examples: 878 ------------------ 879 Connection c; // en established connection 880 struct User { @PK int id; int x; string y } 881 c.update!User(1, [ 882 "x": Value(2), 883 "y": Value("Hello there")]); 884 ------------------ 885 */ 886 int update(T, U)(U id, Value[string] updates, bool async = false) 887 { 888 QueryBuilder qb; 889 890 qb.update(relationName!T) 891 .set(updates) 892 .where([primaryKeyAttributeName!T: id]); 893 894 auto q = qb.query(this); 895 896 if (async) 897 { 898 q.runAsync(); 899 return -1; 900 } 901 902 auto r = q.run(); 903 return r.rows; 904 } 905 906 unittest 907 { 908 writeln("\t * update with AA updates"); 909 910 @relation("update_aa_test") 911 struct Test 912 { 913 @serial @PK int id; 914 int n; 915 } 916 c.ensureSchema!Test; 917 918 Test t; 919 t.n = 1; 920 c.insert(t); 921 922 int rows = c.update!Test(1, ["n": Value(2)]); 923 assert(rows == 1, `r.rows == 1`); 924 925 c.exec("DROP TABLE update_aa_test"); 926 } 927 928 // ditto, but async 929 void updateAsync(T, U)(U id, Value[string] updates) 930 { 931 update!T(id, updates, true); 932 } 933 934 /** 935 Similar to above, but accepts the whole structure as an update param. 936 Filters by the PK, updates ALL the values in the filtered rows. 937 938 Params: 939 id = value of the relation's PK to filter by 940 updates = the structure that will provide values for the UPDATE 941 asnyc = whether the query should be sent async 942 943 Examples: 944 ------------------ 945 Connection c; // an established connection 946 struct User { @PK int id; int a; int b; } 947 c.update(1, myUser); 948 ------------------ 949 */ 950 int update(T, U)(U id, T updates, bool async = false) 951 { 952 import dpq.attributes; 953 954 QueryBuilder qb; 955 956 qb.update(relationName!T) 957 .where([primaryKeyAttributeName!T: id]); 958 959 foreach (m; serialisableMembers!T) 960 qb.set(attributeName!(mixin("T." ~ m)), __traits(getMember, updates, m)); 961 962 auto q = qb.query(this); 963 if (async) 964 { 965 qb.query(this).runAsync(); 966 return -1; 967 } 968 969 auto r = q.run(); 970 return r.rows; 971 } 972 973 // ditto, async 974 void updateAsync(T, U)(U id, T updates) 975 { 976 update!T(id, updates, true); 977 } 978 979 unittest 980 { 981 writeln("\t * update with object"); 982 983 @relation("update_object_test") 984 struct Test 985 { 986 @serial @PK int id; 987 int n; 988 } 989 c.ensureSchema!Test; 990 991 Test t; 992 t.n = 1; 993 t.id = 1; // assumptions <3 994 995 c.insert(t); 996 997 t.n = 2; 998 c.update!Test(1, t); 999 1000 t = c.findOne!Test(1); 1001 assert(t.n == 2); 1002 1003 t.n = 3; 1004 c.updateAsync!Test(1, t); 1005 auto r = c.nextResult(); 1006 1007 writeln("\t\t * async"); 1008 assert(r.rows == 1); 1009 1010 c.exec("DROP TABLE update_object_test"); 1011 } 1012 1013 /** 1014 Inserts the given structure, returning whatever columns are specified by the 1015 second param as a normal Result. 1016 1017 Equivalent to specifying RETURNING at the end of the query. 1018 1019 Examples: 1020 ------------------- 1021 Connection c; // an established connection 1022 struct Data { @PK int id, int a; int b; } 1023 Data myData; 1024 auto result = c.insert(myData, "id"); 1025 ------------------- 1026 */ 1027 Result insertR(T)(T val, string ret = "") 1028 if(!isArray!T) 1029 { 1030 QueryBuilder qb; 1031 qb.insert(relationName!T, AttributeList!(T, true, true)); 1032 if (ret.length > 0) 1033 qb.returning(ret); 1034 1035 qb.addValues!T(val); 1036 1037 return qb.query(this).run(); 1038 } 1039 1040 /** 1041 Inserts the given array structures in a singl query, returning whatever columns are specified by the 1042 second param as a normal Result. 1043 1044 Equivalent to specifying RETURNING at the end of the query. 1045 1046 Examples: 1047 ------------------- 1048 Connection c; // an established connection 1049 struct Data { @PK int id, int a; int b; } 1050 Data[2] myData; 1051 auto result = c.insert(myData, "id"); 1052 ------------------- 1053 */ 1054 Result insertR(T)(T vals, string ret = "") 1055 if(isArray!T) 1056 { 1057 alias BT = BaseType!T; 1058 1059 if (!vals.length) 1060 return Result.init; 1061 1062 QueryBuilder qb; 1063 qb.insert(relationName!BT, AttributeList!(BT, true, true)); 1064 if (ret.length > 0) 1065 qb.returning(ret); 1066 1067 foreach (val; vals) 1068 qb.addValues!BT(val); 1069 1070 return qb.query(this).run(); 1071 } 1072 1073 /** 1074 Inserts the given structure to the DB 1075 1076 Examples: 1077 --------------- 1078 Connection c; // An established connection 1079 struct User {@PK @serial int id; int a } 1080 User myUser; 1081 c.insert(myUser); 1082 --------------- 1083 */ 1084 bool insert(T)(T val, bool async = false) 1085 if(!isArray!T) 1086 { 1087 QueryBuilder qb; 1088 qb.insert(relationName!T, AttributeList!(T, true, true)); 1089 1090 qb.addValues!T(val); 1091 1092 if (async) 1093 return qb.query(this).runAsync(); 1094 1095 auto r = qb.query(this).run(); 1096 return r.rows > 0; 1097 } 1098 1099 /** 1100 Inserts the given array of structures to the DB as one query 1101 1102 Examples: 1103 --------------- 1104 Connection c; // An established connection 1105 struct User {@PK @serial int id; int a } 1106 User[2] myUsers; 1107 c.insert(myUsers); 1108 --------------- 1109 */ 1110 int insert(T)(T vals, bool async = false) 1111 if(isArray!T) 1112 { 1113 alias BT = BaseType!T; 1114 1115 QueryBuilder qb; 1116 qb.insert(relationName!BT, AttributeList!(BT, true, true)); 1117 1118 if (!vals.length) 1119 return 0; 1120 1121 foreach (val; vals) 1122 qb.addValues!BT(val); 1123 1124 if (async) 1125 return qb.query(this).runAsync(); 1126 1127 auto r = qb.query(this).run(); 1128 return r.rows; 1129 } 1130 1131 unittest 1132 { 1133 writeln("\t * insert"); 1134 1135 @relation("insert_test_inner") 1136 struct Inner 1137 { 1138 int bar; 1139 } 1140 1141 @relation("insert_test") 1142 struct Test 1143 { 1144 int n; 1145 Nullable!int n2; 1146 Inner foo; 1147 } 1148 c.ensureSchema!Test; 1149 1150 Test t; 1151 t.n = 1; 1152 t.n2 = 2; 1153 t.foo.bar = 2; 1154 1155 auto r = c.insert(t); 1156 assert(r == true); 1157 1158 auto r2 = c.insertR(t, "n"); 1159 assert(r2.rows == 1); 1160 assert(r2[0][0].as!int == t.n); 1161 1162 Test t2 = c.findOneBy!Test("n", 1); 1163 assert(t2 == t, t.to!string ~ " != " ~ t2.to!string); 1164 1165 Test[] t_arr; 1166 t_arr ~= Test.init; 1167 t_arr[0].n = 1; 1168 t_arr[0].n2 = 2; 1169 t_arr[0].foo.bar = 3; 1170 t_arr ~= Test.init; 1171 t_arr[1].n = 4; 1172 t_arr[1].n2 = 5; 1173 t_arr[1].foo.bar = 6; 1174 1175 auto r3 = c.insert(t_arr); 1176 assert(r3 == 2); 1177 1178 auto r4 = c.insertR(t_arr, "n"); 1179 assert(r4[0][0].as!int == t_arr[0].n); 1180 assert(r4[1][0].as!int == t_arr[1].n); 1181 1182 writeln("\t\t * async"); 1183 t.n = 123; 1184 t.n2.nullify; 1185 c.insertAsync(t); 1186 1187 auto res = c.nextResult(); 1188 assert(res.rows == 1); 1189 t2 = c.findOneBy!Test("n", 123); 1190 assert(t.n2.isNull); 1191 assert(t2.n2.isNull); 1192 1193 c.exec("DROP TABLE insert_test"); 1194 c.exec("DROP TYPE \"%s\" CASCADE".format(relationName!Inner)); 1195 } 1196 1197 /// ditto, async 1198 void insertAsync(T)(T val) 1199 { 1200 insert(val, true); 1201 } 1202 1203 /** 1204 Deletes the record in the given table, by its PK 1205 1206 Examples: 1207 --------------- 1208 Connection c; // An established connection 1209 struct User {@PK @serial int id; int a } 1210 c.remove!User(1); 1211 --------------- 1212 */ 1213 int remove(T, U)(U id) 1214 { 1215 QueryBuilder qb; 1216 qb.remove!T 1217 .where([primaryKeyAttributeName!T: id]); 1218 1219 return qb.query(this).run().rows; 1220 } 1221 1222 // ditto, async 1223 bool removeAsync(T, U)(U id) 1224 { 1225 QueryBuilder qb; 1226 qb.remove!T 1227 .where([primaryKeyAttributeName!T: id]); 1228 1229 return qb.query(this).runAsync() == 1; 1230 } 1231 1232 1233 /** 1234 Deletes rows in the specified relation, filtered by the given filter string and values 1235 1236 Examples: 1237 --------------- 1238 Connection c; // An established connection 1239 struct User { @PK @serial int id; int posts } 1240 c.remove!User("id > $1 AND posts == $2", 50, 0); 1241 --------------- 1242 */ 1243 int remove(T, U...)(string filter, U vals) 1244 { 1245 QueryBuilder qb; 1246 qb.remove!T 1247 .where(filter); 1248 1249 foreach (v; vals) 1250 qb.addValue(v); 1251 1252 return qb.query(this).run().rows; 1253 } 1254 1255 /// ditto, async 1256 bool removeAsync(T, U...)(string filter, U vals) 1257 { 1258 QueryBuilder qb; 1259 qb.remove!T 1260 .where(filter); 1261 1262 foreach (v; vals) 1263 qb.addValue(v); 1264 1265 return qb.query(this).runAsync() == 1; 1266 } 1267 1268 unittest 1269 { 1270 @relation("remove_test") 1271 struct Test 1272 { 1273 @serial @PK int id; 1274 int n; 1275 } 1276 c.ensureSchema!Test; 1277 1278 foreach (i; 0 .. 10) 1279 c.insert(Test(0, i)); 1280 1281 writeln("\t * remove(id)"); 1282 int n = c.remove!Test(1); 1283 assert(n == 1, `n == 1`); 1284 1285 1286 writeln("\t\t * async"); 1287 c.removeAsync!Test(2); 1288 auto r = c.nextResult(); 1289 assert(r.rows == 1, `r.rows == 1`); 1290 1291 writeln("\t * remove(filter, vals...)"); 1292 n = c.remove!Test("id IN($1,$2,$3,$4,$5)", 3, 4, 5, 6, 7); 1293 assert(n == 5); 1294 1295 writeln("\t\t * async"); 1296 c.removeAsync!Test("id >= $1", 7); 1297 r = c.nextResult(); 1298 assert(r.rows == 3); 1299 1300 c.exec("DROP TABLE remove_test"); 1301 } 1302 1303 /** 1304 Returns a count of the rows matching the filter in the specified relation. 1305 Filter can be empty or not given to select a count of all the rows in the relation. 1306 1307 Examples: 1308 --------------- 1309 Connection c; // An established connection 1310 struct User {@PK @serial int id; int a } 1311 long nUsers = c.count!User; 1312 nUsers = c.count!User("id > $1", 123); 1313 --------------- 1314 */ 1315 long count(T, U...)(string filter = "", U vals = U.init) 1316 { 1317 import dpq.query; 1318 auto q = Query(this); 1319 string str = `SELECT COUNT(*) FROM "%s"`.format(relationName!T); 1320 1321 if (filter.length > 0) 1322 str ~= " WHERE " ~ filter; 1323 1324 q = str; 1325 auto r = q.run(vals); 1326 1327 return r[0][0].as!long; 1328 } 1329 1330 unittest 1331 { 1332 writeln("\t * count"); 1333 1334 @relation("test_count") 1335 struct Test 1336 { 1337 @serial @PK int id; 1338 int n; 1339 } 1340 1341 c.ensureSchema!Test; 1342 1343 Test t; 1344 c.insert(t); 1345 1346 assert(c.count!Test == 1, `count == 1`); 1347 c.insert(t); 1348 assert(c.count!Test == 2, `count == 2`); 1349 1350 c.exec("DROP TABLE test_count"); 1351 } 1352 1353 /** 1354 Equivalent to calling PQisBusy from libpq. Only useful if you're doing async 1355 stuff manually. 1356 */ 1357 bool isBusy() 1358 { 1359 return PQisBusy(_connection) == 1; 1360 } 1361 1362 unittest 1363 { 1364 writeln("\t * isBusy"); 1365 1366 assert(c.isBusy() == false); 1367 1368 c.send("SELECT 1::INT"); 1369 1370 // This could fail in theory, but in practice ... fat chance. 1371 assert(c.isBusy() == true); 1372 1373 c.nextResult(); 1374 assert(c.isBusy() == false); 1375 } 1376 1377 1378 /** 1379 Blocks until a result is read, then returns it 1380 1381 If no more results remain, a null result will be returned 1382 1383 Make sure to call this until a null is returned or just use allResults. 1384 */ 1385 Result nextResult() 1386 { 1387 PGresult* res = PQgetResult(_connection); 1388 return Result(res); 1389 } 1390 1391 /** 1392 Calls nextResult until the value returned is null, the returns them 1393 as an array. 1394 */ 1395 Result[] allResults() 1396 { 1397 Result[] res; 1398 1399 PGresult* r; 1400 while ((r = PQgetResult(_connection)) != null) 1401 res ~= Result(r); 1402 1403 return res; 1404 } 1405 1406 /** 1407 Calls nextResult until null is returned, then retuns only the last non-null result. 1408 */ 1409 Result lastResult() 1410 { 1411 Result res; 1412 1413 PGresult* r; 1414 while ((r = PQgetResult(_connection)) != null) 1415 res = Result(r); 1416 1417 return res; 1418 } 1419 1420 unittest 1421 { 1422 writeln("\t * nextResult"); 1423 auto x = c.nextResult(); 1424 assert(x.isNull); 1425 1426 int int1 = 1; 1427 int int2 = 2; 1428 1429 c.sendParams("SELECT $1", int1); 1430 1431 // In every way the same as lastResult 1432 Result r, t; 1433 while(!(t = c.nextResult()).isNull) 1434 r = t; 1435 1436 assert(r.rows == 1); 1437 assert(r.columns == 1); 1438 assert(r[0][0].as!int == int1); 1439 1440 writeln("\t * lastResult"); 1441 c.sendParams("SELECT $1", int2); 1442 r = c.lastResult(); 1443 1444 assert(r.rows == 1); 1445 assert(r.columns == 1); 1446 assert(r[0][0].as!int == int2); 1447 } 1448 1449 Result prepare(T...)(string name, string command, T paramTypes) 1450 { 1451 Oid[] oids; 1452 foreach (pType; paramTypes) 1453 oids ~= pType; 1454 1455 char* cName = cast(char*) name.toStringz; 1456 char* cComm = cast(char*) command.toStringz; 1457 1458 auto p = PreparedStatement(this, name, command, oids); 1459 _prepared[name] = p; 1460 1461 return Result(PQprepare( 1462 _connection, 1463 cName, 1464 cComm, 1465 oids.length.to!int, 1466 oids.ptr)); 1467 } 1468 1469 Result execPrepared(string name, Value[] params...) 1470 { 1471 char* cStr = cast(char*) name.toStringz; 1472 1473 return Result(PQexecPrepared( 1474 _connection, 1475 cStr, 1476 params.length.to!int, 1477 cast(char**) params.paramValues.ptr, 1478 params.paramLengths.ptr, 1479 params.paramFormats.ptr, 1480 1)); 1481 } 1482 1483 Result execPrepared(T...)(string name, T params) 1484 { 1485 Value[] vals; 1486 foreach (p; params) 1487 vals ~= Value(p); 1488 1489 return execPrepared(name, vals); 1490 } 1491 1492 bool sendPrepared(string name, Value[] params...) 1493 { 1494 char* cStr = cast(char*) name.toStringz; 1495 1496 return PQsendQueryPrepared( 1497 _connection, 1498 cStr, 1499 params.length.to!int, 1500 cast(char**) params.paramValues.ptr, 1501 params.paramLengths.ptr, 1502 params.paramFormats.ptr, 1503 1) == 1; 1504 } 1505 1506 bool sendPrepared(T...)(string name, T params) 1507 { 1508 Value[] vals; 1509 foreach (p; params) 1510 vals ~= Value(p); 1511 1512 return sendPrepared(name, vals); 1513 } 1514 1515 unittest 1516 { 1517 writeln("\t * prepare"); 1518 // The result of this isn't really all that useful, but as long as it 1519 // throws on errors, it kinda is 1520 c.prepare("prepare_test", "SELECT $1", Type.INT4); 1521 1522 writeln("\t * execPrepared"); 1523 auto r = c.execPrepared("prepare_test", 1); 1524 assert(r.rows == 1); 1525 assert(r[0][0].as!int == 1); 1526 1527 writeln("\t\t * sendPrepared"); 1528 bool s = c.sendPrepared("prepare_test", 1); 1529 assert(s); 1530 1531 r = c.lastResult(); 1532 assert(r.rows == 1); 1533 assert(r[0][0].as!int == 1); 1534 } 1535 1536 /** 1537 Begins a transaction block. 1538 */ 1539 void begin() 1540 { 1541 import dpq.query; 1542 auto q = Query(this, "BEGIN"); 1543 q.run(); 1544 } 1545 1546 /** 1547 Commits current transaction 1548 */ 1549 void commit() 1550 { 1551 import dpq.query; 1552 auto q = Query(this, "COMMIT"); 1553 q.run(); 1554 } 1555 1556 /** 1557 Creates savepoint in the current transaction block. 1558 1559 Params: 1560 name = name of created savepoint 1561 1562 Returns: created Savepoint. 1563 */ 1564 Savepoint savepoint(string name) 1565 { 1566 import dpq.query; 1567 Savepoint s = new Savepoint(name); 1568 auto q = Query(this, "SAVEPOINT " ~ s.name); 1569 q.run(); 1570 1571 return s; 1572 } 1573 1574 /** 1575 Destroys savepoint in the current transaction block. 1576 1577 Params: 1578 s = savepoint to destroy 1579 */ 1580 void releaseSavepoint(Savepoint s) 1581 { 1582 import dpq.query; 1583 auto q = Query(this, "RELEASE SAVEPOINT " ~ s.name); 1584 q.run(); 1585 } 1586 1587 /** 1588 Rollback the current transaction to savepoint. 1589 1590 Params: 1591 s = savepoint to rollback to. If savepoint s is null or no savepoint is specified then transaction 1592 will be rolled back to the begining. 1593 */ 1594 void rollback(Savepoint s = null) 1595 { 1596 import dpq.query; 1597 auto q = Query(this); 1598 1599 if (s is null) 1600 q.command = "ROLLBACK"; 1601 else 1602 q.command = "ROLLBACK TO " ~ s.name; 1603 1604 q.run(); 1605 } 1606 1607 unittest 1608 { 1609 writeln("\t * transaction"); 1610 1611 @relation("transaction_test") 1612 struct Test 1613 { 1614 @serial @PK int id; 1615 string t; 1616 } 1617 1618 c.ensureSchema!Test; 1619 1620 Test t; 1621 t.t = "before transaction"; 1622 auto r = c.insertR(t, "id"); 1623 t.id = r[0][0].as!int; 1624 1625 c.begin(); 1626 t.t = "this value is ignored"; 1627 c.update(t.id, t); 1628 1629 auto s1 = c.savepoint("s1"); 1630 t.t = "before savepoint s2"; 1631 c.update(t.id, t); 1632 1633 auto s2 = c.savepoint("s2"); 1634 t.t = "after savepoint s2"; 1635 c.update(t.id, t); 1636 1637 assert(c.findOne!Test(t.id).t == "after savepoint s2"); 1638 1639 c.rollback(s2); 1640 assert(c.findOne!Test(t.id).t == "before savepoint s2"); 1641 1642 c.rollback(); 1643 assert(c.findOne!Test(t.id).t == "before transaction"); 1644 1645 Connection c2 = Connection("host=127.0.0.1 dbname=test user=test"); 1646 c.begin(); 1647 t.t = "inside transaction"; 1648 c.update(t.id, t); 1649 1650 assert( c.findOne!Test(t.id).t == "inside transaction"); 1651 assert(c2.findOne!Test(t.id).t == "before transaction"); 1652 1653 c.commit(); 1654 assert( c.findOne!Test(t.id).t == "inside transaction"); 1655 assert(c2.findOne!Test(t.id).t == "inside transaction"); 1656 1657 c.exec("DROP TABLE transaction_test"); 1658 c2.close(); 1659 } 1660 1661 ref PreparedStatement prepared(string name) 1662 { 1663 return _prepared[name]; 1664 } 1665 1666 ref PreparedStatement opIndex(string name) 1667 { 1668 return prepared(name); 1669 } 1670 } 1671 1672 1673 /** 1674 Deserialises the given Row to the requested type 1675 1676 Params: 1677 T = (template) type to deserialise into 1678 r = Row to deserialise 1679 */ 1680 T deserialise(T)(Row r, string prefix = "") 1681 { 1682 static if (is(T == class)) 1683 T res = new T(); 1684 else 1685 T res; 1686 foreach (m; serialisableMembers!T) 1687 { 1688 enum member = "T." ~ m; 1689 enum n = attributeName!(mixin(member)); 1690 alias OType = typeof(mixin(member)); 1691 alias MType = RealType!OType; 1692 1693 try 1694 { 1695 auto x = r[prefix ~ n].as!MType; 1696 if (!x.isNull) 1697 __traits(getMember, res, m) = cast(OType) x; 1698 } 1699 catch (DPQException e) 1700 { 1701 if (!isInstanceOf!(Nullable, MType)) 1702 throw e; 1703 } 1704 } 1705 return res; 1706 } 1707 1708 class Savepoint 1709 { 1710 string name; 1711 1712 this(string name) 1713 { 1714 this.name = name; 1715 } 1716 } 1717 1718 /// Hold the last created connection, not to be used outside the library 1719 package Connection* _dpqLastConnection;